# Lab 6: DataFrame Operations - Solutions

**Objective**: Master advanced DataFrame operations including joins, aggregations, and data manipulation.

**Learning Outcomes**:
- Perform complex joins and unions
- Apply advanced aggregation patterns
- Manipulate DataFrame schemas and data types
- Handle missing data and data quality issues
- Optimize DataFrame operations for performance

**Estimated Time**: 55 minutes

---

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time

spark = SparkSession.builder \
    .appName("Lab6-DataFrame-Ops") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.logLevel", "ERROR") \
    .config("spark.sql.execution.arrow.maxRecordsPerBatch", "1000") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()
    
sc = spark.sparkContext
sc.setLogLevel("ERROR")  # Suppress warnings for cleaner output
spark.sparkContext.setLogLevel("ERROR")  # Extra safety for log suppression

print(f"üöÄ DataFrame Operations Lab - Spark {spark.version}")

# Enhanced Spark UI URL display
ui_url = spark.sparkContext.uiWebUrl
print(f"Spark UI: {ui_url}")
print("üí° In GitHub Codespaces: Check the 'PORTS' tab below for forwarded port 4040 to access Spark UI")

## Part 1: Advanced Join Operations

In [None]:
# Load all datasets
customers_df = spark.read.csv("../Datasets/customers.csv", header=True, inferSchema=True)
transactions_df = spark.read.csv("../Datasets/customer_transactions.csv", header=True, inferSchema=True)
products_df = spark.read.csv("../Datasets/product_catalog.csv", header=True, inferSchema=True)
social_users_df = spark.read.csv("../Datasets/social_media_users.csv", header=True, inferSchema=True)

print("üìä Datasets loaded for advanced operations")
print(f"  - Customers: {customers_df.count():,} records")
print(f"  - Transactions: {transactions_df.count():,} records")
print(f"  - Products: {products_df.count():,} records")
print(f"  - Social Users: {social_users_df.count():,} records")

**Exercise 1.1**: Implement complex join patterns and analyze performance.

In [None]:
# Solution: Advanced Join Challenge

# Challenge 1: Multiple table joins with different join types
print("üîó Complex Multi-table Join Analysis")

# Step 1: Customer transaction summary
customer_summary = transactions_df.groupBy("customer_id") \
    .agg(
        F.sum("amount").alias("total_spent"),
        F.count("*").alias("transaction_count"),
        F.avg("amount").alias("avg_transaction"),
        F.countDistinct("category").alias("categories_purchased"),
        F.max("transaction_date").alias("last_transaction_date"),
        F.min("transaction_date").alias("first_transaction_date")
    )

print(f"‚úÖ Customer summary created: {customer_summary.count():,} customers")

# Step 2: Product category insights
category_products = products_df.groupBy("category") \
    .agg(
        F.count("*").alias("product_count"),
        F.avg("price").alias("avg_product_price"),
        F.sum("stock_quantity").alias("total_inventory"),
        F.collect_list("name").alias("product_names")
    )

print(f"‚úÖ Category insights created: {category_products.count():,} categories")

# Step 3: Build comprehensive customer 360 profile
# First, let's check and adapt to the actual social media data schema
print("üìã Social media dataset schema:")
social_users_df.printSchema()

# Create a compatible social media dataset for joining
# Map user_id to customer_id and use available columns
social_adapted = social_users_df \
    .withColumnRenamed("user_id", "customer_id") \
    .withColumn("platform", F.lit("Social Platform")) \
    .withColumnRenamed("follower_count", "followers_count") \
    .withColumn("posts_count", F.lit(0)) \
    .withColumn("engagement_rate", 
               F.when(F.col("followers_count") > 0, 
                     (F.col("following_count") / F.col("followers_count")) * 100)
               .otherwise(0.0))

customer_360 = customers_df \
    .join(customer_summary, "customer_id", "inner") \
    .join(social_adapted.select("customer_id", "platform", "followers_count", "posts_count", "engagement_rate", "verified"), 
          "customer_id", "left") \
    .withColumn("customer_segment",
        F.when(F.col("total_spent") > 5000, "VIP")
        .when(F.col("total_spent") > 2000, "High Value")
        .when(F.col("total_spent") > 500, "Medium Value")
        .otherwise("Low Value")
    ) \
    .withColumn("days_as_customer",
        F.datediff(F.col("last_transaction_date"), F.col("first_transaction_date"))
    ) \
    .withColumn("has_social_presence", 
        F.col("platform").isNotNull()
    ) \
    .withColumn("is_verified_user",
        F.coalesce(F.col("verified"), F.lit(False))
    )

print("üìà Customer 360 Profile Sample:")
customer_360.select(
    "customer_id", "name", "state", "customer_segment", 
    "total_spent", "transaction_count", "categories_purchased",
    "has_social_presence", "followers_count", "engagement_rate", "is_verified_user"
).show(10)

# Challenge 2: Self-joins for time-series analysis
print("\nüìä Spending Growth Analysis:")

# Create monthly spending by customer
monthly_trends = transactions_df \
    .withColumn("year_month", F.date_format(F.col("transaction_date"), "yyyy-MM")) \
    .groupBy("customer_id", "year_month") \
    .agg(
        F.sum("amount").alias("monthly_spending"),
        F.count("*").alias("monthly_transactions")
    )

# Self-join to compare consecutive months
spending_growth = monthly_trends.alias("current") \
    .join(
        monthly_trends.alias("previous"),
        (F.col("current.customer_id") == F.col("previous.customer_id")) &
        (F.add_months(F.to_date(F.col("previous.year_month"), "yyyy-MM"), 1) ==
         F.to_date(F.col("current.year_month"), "yyyy-MM")),
        "inner"
    ) \
    .select(
        F.col("current.customer_id"),
        F.col("current.year_month"),
        F.col("current.monthly_spending"),
        F.col("previous.monthly_spending").alias("prev_month_spending"),
        ((F.col("current.monthly_spending") - F.col("previous.monthly_spending")) /
         F.col("previous.monthly_spending") * 100).alias("growth_rate")
    ) \
    .filter(F.col("growth_rate") > 20)  # Show customers with >20% growth

print("Customers with significant month-over-month growth (>20%):")
spending_growth.orderBy(F.col("growth_rate").desc()).show(10)

# Join performance analysis
print("\nüîß Join Performance Analysis:")

# Test broadcast join vs regular join
start_time = time.time()
broadcast_join = customers_df.join(F.broadcast(category_products), 
                                 customers_df.state == category_products.category, "left")
broadcast_count = broadcast_join.count()
broadcast_time = time.time() - start_time

start_time = time.time()
regular_join = customers_df.join(category_products, 
                               customers_df.state == category_products.category, "left")
regular_count = regular_join.count()
regular_time = time.time() - start_time

print(f"Broadcast join: {broadcast_count:,} results in {broadcast_time:.4f}s")
print(f"Regular join: {regular_count:,} results in {regular_time:.4f}s")

# Validation
customer_360_count = customer_360.count()
growth_customers = spending_growth.count()

assert customer_360_count > 0, "Should have customer 360 profiles"
assert growth_customers >= 0, "Should have growth analysis (may be 0)"

print(f"\n‚úì Exercise 1.1 completed!")
print(f"üìä Created {customer_360_count:,} customer profiles, identified {growth_customers} high-growth customers")

## Part 2: Advanced Aggregations and Grouping

In [None]:
# Pivot tables and cube/rollup operations
print("üìã Advanced Aggregation Patterns")

# Pivot analysis - sales by state and category
sales_pivot = transactions_df.join(customers_df, "customer_id") \
    .groupBy("state") \
    .pivot("category") \
    .agg(F.sum("amount").alias("total_sales"))

print("üîÑ Pivot Table - Sales by State and Category:")
sales_pivot.show()

# Cube operation for multi-dimensional analysis
sales_cube = transactions_df.join(customers_df, "customer_id") \
    .cube("state", "category") \
    .agg(F.sum("amount").alias("total_sales"), 
         F.count("*").alias("transaction_count")) \
    .orderBy("state", "category")

print("\nüé≤ Cube Analysis - All Combinations:")
sales_cube.show(20)

**Exercise 2.1**: Build sophisticated aggregation pipelines.

In [None]:
# Solution: Advanced Aggregation Challenge

# Challenge 1: Multi-level rollups
print("üìÖ Time-based Rollup Analysis")

# Add time components
transactions_with_time = transactions_df \
    .withColumn("year", F.year(F.col("transaction_date"))) \
    .withColumn("quarter", F.quarter(F.col("transaction_date"))) \
    .withColumn("month", F.month(F.col("transaction_date")))

# Create rollup by year, quarter, month
time_rollup = transactions_with_time \
    .rollup("year", "quarter", "month") \
    .agg(
        F.sum("amount").alias("total_sales"),
        F.count("*").alias("transaction_count"),
        F.countDistinct("customer_id").alias("unique_customers"),
        F.avg("amount").alias("avg_transaction_amount")
    ) \
    .withColumn("aggregation_level",
        F.when(F.col("year").isNull(), "Grand Total")
        .when(F.col("quarter").isNull(), "Year Total")
        .when(F.col("month").isNull(), "Quarter Total")
        .otherwise("Month Detail")
    ) \
    .orderBy("year", "quarter", "month")

print("Time-based rollup results:")
time_rollup.show(25)

# Challenge 2: Percentile and statistical aggregations
print("\nüìä Customer Segment Statistics:")

# First, create customer segments
customer_segments = customers_df \
    .join(transactions_df.groupBy("customer_id").agg(F.sum("amount").alias("total_spent")), "customer_id") \
    .withColumn("age_group",
        F.when(F.col("age") < 25, "18-25")
        .when(F.col("age") < 35, "26-35")
        .when(F.col("age") < 45, "36-45")
        .when(F.col("age") < 55, "46-55")
        .otherwise("55+")
    ) \
    .withColumn("spending_tier",
        F.when(F.col("total_spent") > 3000, "High")
        .when(F.col("total_spent") > 1000, "Medium")
        .otherwise("Low")
    )

# Calculate advanced statistics by segment
segment_stats = customer_segments \
    .groupBy("age_group", "spending_tier") \
    .agg(
        F.count("*").alias("customer_count"),
        F.avg("total_spent").alias("avg_spending"),
        F.stddev("total_spent").alias("spending_stddev"),
        F.min("total_spent").alias("min_spending"),
        F.max("total_spent").alias("max_spending"),
        F.expr("percentile_approx(total_spent, 0.25)").alias("p25_spending"),
        F.expr("percentile_approx(total_spent, 0.5)").alias("median_spending"),
        F.expr("percentile_approx(total_spent, 0.75)").alias("p75_spending"),
        F.expr("percentile_approx(total_spent, 0.95)").alias("p95_spending")
    ) \
    .withColumn("coefficient_of_variation", 
               F.col("spending_stddev") / F.col("avg_spending")) \
    .orderBy("age_group", "spending_tier")

print("Statistical analysis by customer segment:")
segment_stats.show()

# Challenge 3: Moving averages and window aggregations
print("\nüìà Moving Average Analysis:")

# Monthly sales by category
monthly_sales = transactions_df \
    .withColumn("year_month", F.date_format(F.col("transaction_date"), "yyyy-MM")) \
    .groupBy("category", "year_month") \
    .agg(F.sum("amount").alias("monthly_sales")) \
    .orderBy("category", "year_month")

# Define window for moving average
window_spec = Window.partitionBy("category") \
    .orderBy("year_month") \
    .rowsBetween(-2, 0)  # 3-month window (current + 2 previous)

# Calculate moving averages
moving_averages = monthly_sales \
    .withColumn("moving_avg_3month", F.avg("monthly_sales").over(window_spec)) \
    .withColumn("moving_sum_3month", F.sum("monthly_sales").over(window_spec)) \
    .withColumn("sales_rank", 
               F.rank().over(Window.partitionBy("category").orderBy(F.col("monthly_sales").desc()))) \
    .withColumn("sales_vs_avg", 
               F.col("monthly_sales") - F.col("moving_avg_3month")) \
    .withColumn("trend_indicator",
        F.when(F.col("sales_vs_avg") > 0, "Above Average")
        .when(F.col("sales_vs_avg") < 0, "Below Average")
        .otherwise("At Average")
    )

print("Moving average analysis by category:")
moving_averages.show(20)

# Advanced window analysis - lag/lead functions
trend_analysis = monthly_sales \
    .withColumn("prev_month_sales", 
               F.lag("monthly_sales").over(Window.partitionBy("category").orderBy("year_month"))) \
    .withColumn("next_month_sales", 
               F.lead("monthly_sales").over(Window.partitionBy("category").orderBy("year_month"))) \
    .withColumn("month_over_month_change",
               (F.col("monthly_sales") - F.col("prev_month_sales")) / F.col("prev_month_sales") * 100) \
    .withColumn("trend_direction",
        F.when(F.col("month_over_month_change") > 10, "Strong Growth")
        .when(F.col("month_over_month_change") > 0, "Growth")
        .when(F.col("month_over_month_change") > -10, "Stable")
        .otherwise("Decline")
    )

print("\nüìä Trend Analysis Results:")
trend_analysis.filter(F.col("prev_month_sales").isNotNull()).show(15)

# Validation
rollup_count = time_rollup.count()
segments_count = segment_stats.count()
moving_avg_count = moving_averages.count()

assert rollup_count > 0, "Should have rollup results"
assert segments_count > 0, "Should have segment statistics"
assert moving_avg_count > 0, "Should have moving average data"

print(f"\n‚úì Exercise 2.1 completed!")
print(f"üìà Generated {rollup_count} rollup aggregations, {segments_count} segment analyses, {moving_avg_count} trend points")

## Part 3: Data Quality and Schema Operations

In [None]:
# Data quality assessment and cleanup
print("üîç Data Quality Assessment")

# Check for missing values
def analyze_missing_data(df, df_name):
    print(f"\n{df_name} Missing Data Analysis:")
    total_rows = df.count()
    
    for col_name in df.columns:
        # Get the data type of the column
        col_type = dict(df.dtypes)[col_name]
        
        # For string columns, check for both null and empty string
        if col_type in ('string', 'StringType'):
            null_count = df.filter(F.col(col_name).isNull() | (F.col(col_name) == "")).count()
        else:
            # For non-string columns (numeric, date, etc.), only check for null
            null_count = df.filter(F.col(col_name).isNull()).count()
            
        null_pct = (null_count / total_rows) * 100 if total_rows > 0 else 0
        print(f"  {col_name}: {null_count} nulls ({null_pct:.1f}%)")
    
    return total_rows

# Analyze all datasets
datasets = [
    (customers_df, "Customers"),
    (transactions_df, "Transactions"),
    (products_df, "Products"),
    (social_users_df, "Social Users")
]

total_records = 0
for df, name in datasets:
    total_records += analyze_missing_data(df, name)

print(f"\nüìä Total records analyzed: {total_records:,}")

# Data type conversions and schema modifications
print("\nüîß Schema Operations")

# Add derived columns with proper data types
enhanced_transactions = transactions_df \
    .withColumn("amount_category", 
        F.when(F.col("amount") < 50, "Low")
        .when(F.col("amount") < 200, "Medium")
        .otherwise("High")) \
    .withColumn("transaction_month", F.date_format(F.col("transaction_date"), "yyyy-MM")) \
    .withColumn("is_weekend_bool", 
        F.when(F.col("is_weekend") == "true", True)
        .when(F.col("is_weekend") == "false", False)
        .otherwise(None).cast(BooleanType())) \
    .withColumn("amount_usd", F.round(F.col("amount"), 2)) \
    .withColumn("transaction_hour", F.hour(F.col("transaction_date"))) \
    .withColumn("days_since_transaction", 
               F.datediff(F.current_date(), F.col("transaction_date")))

print("Enhanced schema:")
enhanced_transactions.printSchema()

print("\nüìã Enhanced transactions sample:")
enhanced_transactions.select(
    "customer_id", "amount", "amount_category", "transaction_month", 
    "is_weekend_bool", "transaction_hour", "days_since_transaction"
).show(5)

**Exercise 3.1**: Implement comprehensive data quality pipeline.

In [None]:
# Solution: Data Quality Pipeline Challenge

class DataQualityPipeline:
    def __init__(self, spark_session):
        self.spark = spark_session
        self.quality_reports = []
    
    def validate_schema(self, df, expected_schema):
        """Validate DataFrame against expected schema"""
        actual_schema = df.schema
        issues = []
        
        # Check for missing columns
        expected_cols = {field.name: field.dataType for field in expected_schema.fields}
        actual_cols = {field.name: field.dataType for field in actual_schema.fields}
        
        missing_cols = set(expected_cols.keys()) - set(actual_cols.keys())
        extra_cols = set(actual_cols.keys()) - set(expected_cols.keys())
        
        if missing_cols:
            issues.append(f"Missing columns: {missing_cols}")
        
        if extra_cols:
            issues.append(f"Extra columns: {extra_cols}")
        
        # Check data types for common columns
        for col_name in set(expected_cols.keys()) & set(actual_cols.keys()):
            if str(expected_cols[col_name]) != str(actual_cols[col_name]):
                issues.append(f"Type mismatch for {col_name}: expected {expected_cols[col_name]}, got {actual_cols[col_name]}")
        
        return issues
    
    def detect_outliers(self, df, column, method='iqr'):
        """Detect outliers in numeric columns"""
        if method == 'iqr':
            # Calculate IQR-based outliers
            quartiles = df.select(
                F.expr(f"percentile_approx({column}, 0.25)").alias("q1"),
                F.expr(f"percentile_approx({column}, 0.75)").alias("q3")
            ).collect()[0]
            
            q1, q3 = quartiles['q1'], quartiles['q3']
            iqr = q3 - q1
            lower_bound = q1 - 1.5 * iqr
            upper_bound = q3 + 1.5 * iqr
            
            outliers_df = df.withColumn("is_outlier", 
                (F.col(column) < lower_bound) | (F.col(column) > upper_bound))
            
        elif method == 'zscore':
            # Calculate Z-score based outliers
            stats = df.select(F.avg(column).alias("mean"), F.stddev(column).alias("stddev")).collect()[0]
            mean_val, stddev_val = stats['mean'], stats['stddev']
            
            outliers_df = df.withColumn("z_score", 
                F.abs(F.col(column) - F.lit(mean_val)) / F.lit(stddev_val)) \
                .withColumn("is_outlier", F.col("z_score") > 3)
        
        return outliers_df
    
    def standardize_formats(self, df):
        """Standardize data formats"""
        standardized = df
        
        # Clean email format if email column exists
        if "email" in df.columns:
            standardized = standardized.withColumn("email_clean", 
                F.lower(F.trim(F.col("email"))))
        
        # Standardize name format if name column exists
        if "name" in df.columns:
            standardized = standardized.withColumn("name_standardized", 
                F.initcap(F.trim(F.col("name"))))
        
        # Standardize phone format if phone column exists
        if "phone" in df.columns:
            standardized = standardized.withColumn("phone_clean",
                F.regexp_replace(F.col("phone"), "[^0-9]", ""))
        
        # Standardize state codes
        if "state" in df.columns:
            standardized = standardized.withColumn("state_code", 
                F.upper(F.trim(F.col("state"))))
        
        return standardized
    
    def handle_missing_values(self, df, strategy='smart'):
        """Handle missing values with different strategies"""
        if strategy == 'smart':
            # Smart imputation logic based on column types and business logic
            cleaned = df
            
            # Numeric columns - use median
            numeric_cols = [field.name for field in df.schema.fields 
                          if isinstance(field.dataType, (IntegerType, DoubleType, FloatType))]
            
            for col_name in numeric_cols:
                if col_name in ['age', 'amount']:
                    median_val = df.select(F.expr(f"percentile_approx({col_name}, 0.5)")).collect()[0][0]
                    cleaned = cleaned.fillna({col_name: median_val})
            
            # Categorical columns - use mode or default values
            categorical_mappings = {
                'state': 'Unknown',
                'category': 'General',
                'payment_method': 'Unknown',
                'gender': 'Unknown'
            }
            
            for col_name, default_val in categorical_mappings.items():
                if col_name in df.columns:
                    cleaned = cleaned.fillna({col_name: default_val})
            
        else:
            # Simple strategies
            if strategy == 'drop':
                cleaned = df.dropna()
            elif strategy == 'zero':
                cleaned = df.fillna(0)
            else:
                cleaned = df.fillna('Unknown')
        
        return cleaned
    
    def validate_business_rules(self, df):
        """Validate business logic constraints"""
        violations = []
        
        # Rule 1: Negative amounts
        if "amount" in df.columns:
            negative_amounts = df.filter(F.col("amount") < 0)
            violations.append(("Negative amounts", negative_amounts.count()))
        
        # Rule 2: Invalid ages
        if "age" in df.columns:
            invalid_ages = df.filter((F.col("age") < 18) | (F.col("age") > 120))
            violations.append(("Invalid ages", invalid_ages.count()))
        
        # Rule 3: Future transaction dates
        if "transaction_date" in df.columns:
            future_dates = df.filter(F.col("transaction_date") > F.current_date())
            violations.append(("Future dates", future_dates.count()))
        
        # Rule 4: Invalid email formats
        if "email" in df.columns:
            invalid_emails = df.filter(~F.col("email").rlike(r"^[\w\.-]+@[\w\.-]+\.\w+$"))
            violations.append(("Invalid emails", invalid_emails.count()))
        
        total_violations = sum(count for _, count in violations)
        print(f"‚ö†Ô∏è  Found {total_violations} total business rule violations:")
        for rule, count in violations:
            if count > 0:
                print(f"  - {rule}: {count} violations")
        
        return violations
    
    def generate_quality_report(self, df, df_name):
        """Generate comprehensive quality report"""
        total_rows = df.count()
        total_columns = len(df.columns)
        
        # Calculate missing value percentages with type-safe null checking
        null_counts = {}
        for col_name in df.columns:
            # Get the data type of the column
            col_type = dict(df.dtypes)[col_name]
            
            # For string columns, check for both null and empty string
            if col_type in ('string', 'StringType'):
                null_count = df.filter(F.col(col_name).isNull() | (F.col(col_name) == "")).count()
            else:
                # For non-string columns (numeric, date, etc.), only check for null
                null_count = df.filter(F.col(col_name).isNull()).count()
                
            null_counts[col_name] = (null_count, (null_count / total_rows) * 100)
        
        # Duplicate analysis
        if "customer_id" in df.columns:
            duplicates = df.count() - df.dropDuplicates(["customer_id"]).count()
        else:
            duplicates = df.count() - df.dropDuplicates().count()
        
        report = {
            'dataset': df_name,
            'total_rows': total_rows,
            'total_columns': total_columns,
            'null_analysis': null_counts,
            'duplicates': duplicates,
            'completeness_score': (1 - sum(count for count, _ in null_counts.values()) / 
                                 (total_rows * total_columns)) * 100
        }
        
        self.quality_reports.append(report)
        return report

# Test the data quality pipeline
print("üî¨ Testing Data Quality Pipeline")

dq_pipeline = DataQualityPipeline(spark)

# 1. Schema Validation
print("\n1. Schema Validation:")
expected_customer_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("state", StringType(), True),
    StructField("signup_date", StringType(), True)
])

schema_issues = dq_pipeline.validate_schema(customers_df, expected_customer_schema)
if schema_issues:
    print("Schema validation issues found:")
    for issue in schema_issues:
        print(f"  - {issue}")
else:
    print("‚úÖ Schema validation passed")

# 2. Missing Value Analysis and Handling
print("\n2. Missing Value Analysis:")
customers_cleaned = dq_pipeline.handle_missing_values(customers_df, strategy='smart')
transactions_cleaned = dq_pipeline.handle_missing_values(transactions_df, strategy='smart')

print(f"Customers: {customers_df.count():,} ‚Üí {customers_cleaned.count():,} after cleaning")
print(f"Transactions: {transactions_df.count():,} ‚Üí {transactions_cleaned.count():,} after cleaning")

# 3. Outlier Detection
print("\n3. Outlier Detection:")
customers_with_outliers = dq_pipeline.detect_outliers(customers_cleaned, "age", method='iqr')
age_outliers = customers_with_outliers.filter(F.col("is_outlier")).count()
print(f"Age outliers detected: {age_outliers} customers")

transactions_with_outliers = dq_pipeline.detect_outliers(transactions_cleaned, "amount", method='iqr')
amount_outliers = transactions_with_outliers.filter(F.col("is_outlier")).count()
print(f"Transaction amount outliers: {amount_outliers} transactions")

# 4. Business Rule Validation
print("\n4. Business Rule Validation:")
customer_violations = dq_pipeline.validate_business_rules(customers_cleaned)
transaction_violations = dq_pipeline.validate_business_rules(transactions_cleaned)

# 5. Data Standardization
print("\n5. Data Standardization:")
customers_standardized = dq_pipeline.standardize_formats(customers_cleaned)
print("Sample standardized customer data:")
customers_standardized.select(
    "customer_id", "name", "name_standardized", "email", "email_clean", "state", "state_code"
).show(3)

# 6. Quality Report Generation
print("\nüìä Quality Report Summary:")
customer_report = dq_pipeline.generate_quality_report(customers_standardized, "Customers")
transaction_report = dq_pipeline.generate_quality_report(transactions_cleaned, "Transactions")

for report in dq_pipeline.quality_reports:
    print(f"\n{report['dataset']} Quality Report:")
    print(f"  - Total Records: {report['total_rows']:,}")
    print(f"  - Columns: {report['total_columns']}")
    print(f"  - Completeness Score: {report['completeness_score']:.1f}%")
    print(f"  - Duplicates: {report['duplicates']:,}")
    
    # Show columns with highest missing values
    high_missing = [(col, pct) for col, (count, pct) in report['null_analysis'].items() if pct > 0]
    if high_missing:
        print("  - Columns with missing values:")
        for col_name, pct in sorted(high_missing, key=lambda x: x[1], reverse=True)[:3]:
            print(f"    ‚Ä¢ {col_name}: {pct:.1f}% missing")

# Validation
assert len(dq_pipeline.quality_reports) == 2, "Should have 2 quality reports"
assert customers_standardized.count() > 0, "Should have standardized customers"
assert transactions_cleaned.count() > 0, "Should have cleaned transactions"

print(f"\n‚úì Exercise 3.1 completed!")
print(f"üîç Comprehensive data quality pipeline implemented and tested")
print(f"üìä Analyzed {sum(r['total_rows'] for r in dq_pipeline.quality_reports):,} total records")

## Summary: DataFrame Operations Mastery

### Advanced Capabilities Covered:

1. **Complex Joins**: Multi-table joins, self-joins, broadcast optimization
2. **Advanced Aggregations**: Pivot, cube, rollup, percentiles, window functions
3. **Schema Operations**: Type conversions, derived columns, validation
4. **Data Quality**: Missing value handling, outlier detection, standardization
5. **Performance**: Join optimization, aggregation strategies, window operations

### Key Techniques Mastered:

| **Operation** | **Technique** | **Use Case** |
|---------------|---------------|-------------|
| **Joins** | Broadcast joins for small tables | Performance optimization |
| **Aggregations** | Pivot/Cube for multi-dimensional analysis | Business intelligence |
| **Window Functions** | Moving averages, rankings, lag/lead | Time series analysis |
| **Data Quality** | Outlier detection, validation rules | Data reliability |
| **Schema Ops** | Type casting, derived columns | Data transformation |

### Production-Ready Patterns:

- ‚úÖ **Comprehensive data quality pipeline** with automated validation
- ‚úÖ **Multi-dimensional aggregation** using cube and rollup operations
- ‚úÖ **Advanced time-series analysis** with window functions
- ‚úÖ **Statistical outlier detection** using IQR and Z-score methods
- ‚úÖ **Business rule validation** with configurable constraints
- ‚úÖ **Performance optimization** through strategic join ordering

### Best Practices Applied:
- Choose appropriate join types and broadcast small dimension tables
- Apply filters before joins to reduce shuffle data
- Use window functions efficiently with proper partitioning
- Handle data quality issues systematically with configurable pipelines
- Monitor and optimize DataFrame operations through execution plan analysis
- Implement comprehensive validation and error handling patterns

In [None]:
spark.stop()
print("üéâ Lab 6 completed! Advanced DataFrame operations mastered.")
print("‚û°Ô∏è  Next: Lab 7 - User-Defined Functions")