# Lab 4: DataFrame API Introduction - Solutions

**Objective**: Master Spark's DataFrame API and understand its advantages over RDDs.

**Learning Outcomes**:
- Understand DataFrame structure and schema benefits
- Convert between RDDs and DataFrames
- Apply DataFrame transformations and actions
- Leverage Catalyst optimizer advantages
- Work with structured and semi-structured data

**Estimated Time**: 45 minutes

---

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum, avg, count, max as spark_max, when, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import time

spark = SparkSession.builder \
    .appName("Lab4-DataFrame-API") \
    .config("spark.sql.adaptive.logLevel", "ERROR") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "1000") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()
    
sc = spark.sparkContext
sc.setLogLevel("ERROR")  # Suppress warnings for cleaner output
spark.sparkContext.setLogLevel("ERROR")  # Extra safety for log suppression

print(f"üöÄ Spark {spark.version} - DataFrame API Lab")

# Enhanced Spark UI URL display
ui_url = spark.sparkContext.uiWebUrl
print(f"Spark UI: {ui_url}")
print("üí° In GitHub Codespaces: Check the 'PORTS' tab below for forwarded port 4040 to access Spark UI")

## Part 1: DataFrame Fundamentals

In [None]:
# Load data as DataFrames
customers_df = spark.read.csv("../Datasets/customers.csv", header=True, inferSchema=True)
transactions_df = spark.read.csv("../Datasets/customer_transactions.csv", header=True, inferSchema=True)

print("üìä DataFrames loaded with inferred schemas:")
customers_df.printSchema()
transactions_df.printSchema()

**Exercise 1.1**: Create DataFrames with explicit schemas and compare with RDD operations.

In [None]:
# Solution: Create explicit schema for transactions
transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("category", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("is_weekend", StringType(), True),
    StructField("discount_applied", StringType(), True)
])

# Solution: Load with explicit schema
transactions_explicit = spark.read.csv(
    "../Datasets/customer_transactions.csv", 
    header=True, 
    schema=transaction_schema
)

# Solution: Compare DataFrame vs RDD operations
# DataFrame approach
df_high_value = transactions_explicit.filter(col("amount") > 100).select("customer_id", "amount")

# RDD approach  
def parse_transaction(line):
    fields = line.split(',')
    return (fields[1], float(fields[2]))  # customer_id, amount

rdd_high_value = spark.sparkContext.textFile("../Datasets/customer_transactions.csv") \
    .filter(lambda line: not line.startswith('transaction_id')) \
    .map(parse_transaction) \
    .filter(lambda x: x[1] > 100)

# Solution: Time both approaches
print("‚è±Ô∏è  Performance comparison:")

start_time = time.time()
df_count = df_high_value.count()
df_time = time.time() - start_time

start_time = time.time()
rdd_count = rdd_high_value.count()
rdd_time = time.time() - start_time

print(f"DataFrame approach: {df_count} records in {df_time:.4f}s")
print(f"RDD approach: {rdd_count} records in {rdd_time:.4f}s")
print(f"Performance ratio: {rdd_time/df_time:.1f}x (DataFrame advantage)")

assert df_high_value.count() == rdd_high_value.count(), "Both should return same count"
print("‚úì Exercise 1.1 completed!")

## Part 2: DataFrame Operations and SQL Integration

In [None]:
# Register DataFrames as temp views for SQL
customers_df.createOrReplaceTempView("customers")
transactions_df.createOrReplaceTempView("transactions")

# DataFrame API vs SQL comparison
print("üìä DataFrame API vs SQL Analysis")

# DataFrame API approach
df_analysis = transactions_df.groupBy("category") \
    .agg(spark_sum("amount").alias("total_amount"), 
         avg("amount").alias("avg_amount"), 
         count("*").alias("transaction_count")) \
    .orderBy(col("total_amount").desc())

# SQL approach
sql_analysis = spark.sql("""
    SELECT category,
           SUM(amount) as total_amount,
           AVG(amount) as avg_amount,
           COUNT(*) as transaction_count
    FROM transactions
    GROUP BY category
    ORDER BY total_amount DESC
""")

print("DataFrame API results:")
df_analysis.show()
print("SQL results:")
sql_analysis.show()

**Exercise 2.1**: Build complex queries using both DataFrame API and SQL.

In [None]:
# Solution: Customer analysis - high-value customers by state
# Requirements: Join customers and transactions, calculate total spending per customer,
# classify as high-value (>$1000), group by state, show counts and averages

print("üéØ Complex Analysis Challenge")

# DataFrame API approach
df_customer_analysis = transactions_df.join(customers_df, "customer_id") \
    .groupBy("customer_id", "state") \
    .agg(spark_sum("amount").alias("total_spent")) \
    .withColumn("is_high_value", when(col("total_spent") > 1000, 1).otherwise(0)) \
    .groupBy("state") \
    .agg(
        count("*").alias("total_customers"),
        spark_sum("is_high_value").alias("high_value_customers"),
        avg("total_spent").alias("avg_spending_per_customer")
    ) \
    .withColumn("high_value_percentage", 
               (col("high_value_customers") / col("total_customers") * 100)) \
    .orderBy(col("high_value_percentage").desc())

# SQL approach
sql_customer_analysis = spark.sql("""
    WITH customer_spending AS (
        SELECT 
            c.customer_id,
            c.state,
            SUM(t.amount) as total_spent
        FROM customers c
        JOIN transactions t ON c.customer_id = t.customer_id
        GROUP BY c.customer_id, c.state
    ),
    state_summary AS (
        SELECT 
            state,
            COUNT(*) as total_customers,
            SUM(CASE WHEN total_spent > 1000 THEN 1 ELSE 0 END) as high_value_customers,
            AVG(total_spent) as avg_spending_per_customer
        FROM customer_spending
        GROUP BY state
    )
    SELECT 
        *,
        (high_value_customers * 100.0 / total_customers) as high_value_percentage
    FROM state_summary
    ORDER BY high_value_percentage DESC
""")

print("DataFrame API approach:")
df_customer_analysis.show()

print("SQL approach:")
sql_customer_analysis.show()

# Solution: Verify both produce same results
df_count = df_customer_analysis.count()
sql_count = sql_customer_analysis.count()
assert df_count == sql_count, "Both approaches should return same number of states"

print("‚úì Exercise 2.1 completed!")
print(f"Both approaches analyzed {df_count} states with identical results")

## Part 3: Advanced DataFrame Features

In [None]:
# Window functions and advanced operations
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

# Customer ranking within each state
customer_totals = transactions_df.join(customers_df, "customer_id") \
    .groupBy("customer_id", "name", "state") \
    .agg(spark_sum("amount").alias("total_spent"))

window_spec = Window.partitionBy("state").orderBy(col("total_spent").desc())

ranked_customers = customer_totals.withColumn("rank", rank().over(window_spec)) \
    .withColumn("row_number", row_number().over(window_spec)) \
    .filter(col("rank") <= 3)

print("üèÜ Top 3 customers per state:")
ranked_customers.orderBy("state", "rank").show()

**Exercise 3.1**: Implement advanced DataFrame operations.

In [None]:
# Solution: Advanced analytics with window functions
# 1. Calculate running totals by customer
# 2. Find month-over-month growth rates
# 3. Identify anomaly transactions (significantly above customer's average)

from pyspark.sql.functions import month, year, lag, when, sum as spark_sum, avg as spark_avg
from pyspark.sql.window import Window

# Solution: Create date-based analysis
transactions_with_date = transactions_df.withColumn("year", year(col("transaction_date"))) \
    .withColumn("month", month(col("transaction_date")))

# Solution: Monthly customer spending
monthly_spending = transactions_with_date.groupBy("customer_id", "year", "month") \
    .agg(spark_sum("amount").alias("monthly_total"))

# Solution: Add running total and growth rate using window functions
customer_window = Window.partitionBy("customer_id").orderBy("year", "month")

customer_trends = monthly_spending \
    .withColumn("running_total", 
               spark_sum("monthly_total").over(customer_window.rowsBetween(Window.unboundedPreceding, Window.currentRow))) \
    .withColumn("prev_month_total", 
               lag("monthly_total").over(customer_window)) \
    .withColumn("month_over_month_growth",
               when(col("prev_month_total").isNull(), 0.0)
               .otherwise((col("monthly_total") - col("prev_month_total")) / col("prev_month_total") * 100))

# Solution: Show results
print("üìà Customer spending trends:")
customer_trends.filter(col("customer_id") == "CUST_000001") \
    .orderBy("year", "month") \
    .show()

# Additional analysis: Identify high-growth customers
high_growth_customers = customer_trends \
    .filter(col("month_over_month_growth") > 50) \
    .groupBy("customer_id") \
    .agg(
        count("*").alias("high_growth_months"),
        avg("month_over_month_growth").alias("avg_growth_rate")
    ) \
    .filter(col("high_growth_months") >= 2) \
    .orderBy(col("avg_growth_rate").desc())

print("\nüöÄ High-growth customers (>50% growth for 2+ months):")
high_growth_customers.show(10)

# Validation
trends_count = customer_trends.count()
growth_count = high_growth_customers.count()
assert trends_count > 0, "Should have customer trends"
assert growth_count >= 0, "Should have high-growth analysis (may be 0)"

print("‚úì Exercise 3.1 completed!")
print(f"Analyzed {trends_count} customer-month combinations")
print(f"Identified {growth_count} high-growth customers")

## Summary: DataFrame API Benefits

### Key Advantages:
1. **Schema Awareness**: Type safety and optimization
2. **Catalyst Optimizer**: Automatic query optimization  
3. **SQL Integration**: Familiar syntax for analysts
4. **Performance**: Better than RDDs for structured data
5. **Expressiveness**: Rich API for complex operations

### When to Use DataFrames vs RDDs:
- **DataFrames**: Structured/semi-structured data, SQL-like operations, performance critical
- **RDDs**: Unstructured data, complex transformations, functional programming style

In [None]:
spark.stop()
print("üéâ Lab 4 completed! DataFrame API mastered.")
print("‚û°Ô∏è  Next: Lab 5 - Spark SQL Basics")