# Apache Spark Big Data Analytics Notebook

This notebook demonstrates comprehensive Apache Spark operations including:
- RDD operations
- DataFrame and Dataset operations
- Spark SQL
- Machine Learning with MLlib
- Graph processing with GraphX
- Performance optimization techniques

## Prerequisites
- Apache Spark 3.5+
- Java 17+
- Python 3.8+ (for PySpark examples)

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("BigDataAnalytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")

## 1. Data Loading and Exploration

In [None]:
# Load the large sales dataset
sales_df = spark.read.csv("../data/large_sales_data.csv", header=True, inferSchema=True)

# Display basic information
print("Dataset Shape:")
print(f"Rows: {sales_df.count()}")
print(f"Columns: {len(sales_df.columns)}")

print("\nSchema:")
sales_df.printSchema()

print("\nFirst 5 rows:")
sales_df.show(5)

In [None]:
# Data quality analysis
print("Data Quality Analysis:")

# Check for null values
null_counts = sales_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sales_df.columns])
null_counts.show()

# Basic statistics
print("\nBasic Statistics:")
sales_df.describe().show()

## 2. RDD Operations

In [None]:
# Convert DataFrame to RDD for RDD operations
sales_rdd = sales_df.rdd

print(f"RDD Partitions: {sales_rdd.getNumPartitions()}")

# Map operation - extract sales amounts
sales_amounts = sales_rdd.map(lambda row: row.sales_amount)
print(f"\nTotal Sales: ${sales_amounts.sum():,.2f}")
print(f"Average Sales: ${sales_amounts.mean():,.2f}")

# Filter operation - high value sales (> $2000)
high_value_sales = sales_rdd.filter(lambda row: row.sales_amount > 2000)
print(f"\nHigh Value Sales Count: {high_value_sales.count()}")

# Word count on product categories
categories = sales_rdd.map(lambda row: row.category) \
                    .map(lambda x: (x, 1)) \
                    .reduceByKey(lambda a, b: a + b) \
                    .sortBy(lambda x: x[1], ascending=False)

print("\nCategory Counts:")
for category, count in categories.collect():
    print(f"{category}: {count}")

## 3. DataFrame Operations and Spark SQL

In [None]:
# Create temporary view for SQL operations
sales_df.createOrReplaceTempView("sales")

# Complex aggregation with Spark SQL
revenue_by_region = spark.sql("""
    SELECT 
        region,
        category,
        COUNT(*) as transaction_count,
        SUM(sales_amount) as total_revenue,
        AVG(sales_amount) as avg_transaction,
        MAX(sales_amount) as max_transaction,
        SUM(quantity) as total_quantity
    FROM sales
    GROUP BY region, category
    ORDER BY total_revenue DESC
""")

print("Revenue Analysis by Region and Category:")
revenue_by_region.show(20)

# Window functions
from pyspark.sql.window import Window

window_spec = Window.partitionBy("region").orderBy(desc("sales_amount"))
ranked_sales = sales_df.withColumn("rank", row_number().over(window_spec))

print("\nTop 3 Sales per Region:")
ranked_sales.filter(col("rank") <= 3).show()

In [None]:
# Advanced DataFrame operations

# Customer segmentation based on spending
customer_spending = sales_df.groupBy("customer_id") \
    .agg(
        sum("sales_amount").alias("total_spent"),
        count("*").alias("transaction_count"),
        avg("sales_amount").alias("avg_transaction"),
        max("customer_age").alias("age"),
        max("customer_income").alias("income")
    )

# Create customer segments
customer_segments = customer_spending.withColumn(
    "segment",
    when(col("total_spent") >= 5000, "High Value")
    .when(col("total_spent") >= 2000, "Medium Value")
    .otherwise("Low Value")
)

print("Customer Segmentation:")
customer_segments.groupBy("segment").count().show()

# Seasonal analysis
seasonal_analysis = spark.sql("""
    SELECT 
        season,
        COUNT(*) as transactions,
        SUM(sales_amount) as revenue,
        AVG(sales_amount) as avg_transaction,
        AVG(discount) as avg_discount
    FROM sales
    GROUP BY season
    ORDER BY revenue DESC
""")

print("\nSeasonal Analysis:")
seasonal_analysis.show()

## 4. Machine Learning with MLlib

In [None]:
# Prepare data for machine learning
# Predict whether a customer will make a high-value purchase (> $2000)

# Feature engineering
ml_data = sales_df.withColumn(
    "high_value", 
    when(col("sales_amount") >= 2000, 1.0).otherwise(0.0)
)

# Convert categorical variables to numeric
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# String indexers for categorical variables
region_indexer = StringIndexer(inputCol="region", outputCol="region_index")
category_indexer = StringIndexer(inputCol="category", outputCol="category_index")
season_indexer = StringIndexer(inputCol="season", outputCol="season_index")
channel_indexer = StringIndexer(inputCol="marketing_channel", outputCol="channel_index")

# One-hot encoders
region_encoder = OneHotEncoder(inputCol="region_index", outputCol="region_encoded")
category_encoder = OneHotEncoder(inputCol="category_index", outputCol="category_encoded")
season_encoder = OneHotEncoder(inputCol="season_index", outputCol="season_encoded")
channel_encoder = OneHotEncoder(inputCol="channel_index", outputCol="channel_encoded")

# Feature vector assembler
feature_cols = ["quantity", "discount", "customer_age", "customer_income", 
                "region_encoded", "category_encoded", "season_encoded", "channel_encoded"]
vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Feature scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Logistic regression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="high_value")

# Create ML pipeline
pipeline = Pipeline(stages=[
    region_indexer, category_indexer, season_indexer, channel_indexer,
    region_encoder, category_encoder, season_encoder, channel_encoder,
    vector_assembler, scaler, lr
])

print("Machine Learning Pipeline created successfully!")

In [None]:
# Train-test split
train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)

print(f"Training data: {train_data.count()} rows")
print(f"Test data: {test_data.count()} rows")

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Show predictions
predictions.select("customer_id", "sales_amount", "high_value", "probability", "prediction").show(10)

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="high_value", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

print(f"\nModel Performance:")
print(f"AUC-ROC: {auc:.4f}")

# Confusion matrix
confusion_matrix = predictions.groupBy("high_value", "prediction").count().show()

# Feature importance (for tree-based models)
print("\nLogistic Regression Coefficients:")
lr_model = model.stages[-1]
coefficients = lr_model.coefficients.toArray()
print(f"Number of features: {len(coefficients)}")

## 5. Streaming Analytics (Simulated)

In [None]:
# Simulate streaming data analysis
# Load streaming events data
streaming_df = spark.read.json("../data/streaming_events.json")

print("Streaming Events Data:")
streaming_df.show(10)

# Real-time analytics simulation
streaming_df.createOrReplaceTempView("events")

# Event analysis
event_analysis = spark.sql("""
    SELECT 
        event,
        COUNT(*) as event_count,
        AVG(duration) as avg_duration,
        COUNT(DISTINCT userId) as unique_users
    FROM events
    GROUP BY event
    ORDER BY event_count DESC
""")

print("\nEvent Analysis:")
event_analysis.show()

# Device and location analysis
device_location_analysis = spark.sql("""
    SELECT 
        deviceType,
        location,
        COUNT(*) as sessions,
        AVG(duration) as avg_session_duration
    FROM events
    GROUP BY deviceType, location
    ORDER BY sessions DESC
""")

print("\nDevice and Location Analysis:")
device_location_analysis.show()

## 6. Performance Optimization

In [None]:
# Performance analysis and optimization

# Check current Spark configuration
print("Current Spark Configuration:")
important_configs = [
    "spark.sql.adaptive.enabled",
    "spark.sql.adaptive.coalescePartitions.enabled",
    "spark.serializer",
    "spark.sql.autoBroadcastJoinThreshold"
]

for config in important_configs:
    value = spark.conf.get(config, "Not Set")
    print(f"{config}: {value}")

# Analyze partitioning
print(f"\nSales DataFrame Partitions: {sales_df.rdd.getNumPartitions()}")

# Cache frequently used DataFrame
sales_df.cache()
sales_df.count()  # Trigger caching

print("\nDataFrame cached for better performance")

# Demonstrate broadcast join
# Create a small lookup table
category_lookup = spark.createDataFrame([
    ("Electronics", "Tech"),
    ("Clothing", "Fashion"),
    ("Home", "Lifestyle"),
    ("Books", "Education")
], ["category", "category_group"])

# Broadcast join (Spark will automatically broadcast small tables)
enriched_sales = sales_df.join(broadcast(category_lookup), "category")

print("\nEnriched Sales with Category Groups:")
enriched_sales.select("product_id", "category", "category_group", "sales_amount").show(10)

## 7. Data Visualization

In [None]:
# Convert Spark DataFrame to Pandas for visualization
revenue_pandas = revenue_by_region.toPandas()

# Set up the plotting style
plt.style.use('seaborn-v0_8')
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# Revenue by Region
region_revenue = revenue_pandas.groupby('region')['total_revenue'].sum().sort_values(ascending=False)
axes[0, 0].bar(region_revenue.index, region_revenue.values)
axes[0, 0].set_title('Total Revenue by Region')
axes[0, 0].set_ylabel('Revenue ($)')
axes[0, 0].tick_params(axis='x', rotation=45)

# Revenue by Category
category_revenue = revenue_pandas.groupby('category')['total_revenue'].sum().sort_values(ascending=False)
axes[0, 1].bar(category_revenue.index, category_revenue.values)
axes[0, 1].set_title('Total Revenue by Category')
axes[0, 1].set_ylabel('Revenue ($)')
axes[0, 1].tick_params(axis='x', rotation=45)

# Transaction Count by Region
region_transactions = revenue_pandas.groupby('region')['transaction_count'].sum()
axes[1, 0].pie(region_transactions.values, labels=region_transactions.index, autopct='%1.1f%%')
axes[1, 0].set_title('Transaction Distribution by Region')

# Average Transaction by Category
avg_transaction = revenue_pandas.groupby('category')['avg_transaction'].mean().sort_values(ascending=False)
axes[1, 1].bar(avg_transaction.index, avg_transaction.values)
axes[1, 1].set_title('Average Transaction Value by Category')
axes[1, 1].set_ylabel('Average Transaction ($)')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

print("Visualizations generated successfully!")

## 8. Summary and Cleanup

In [None]:
# Summary statistics
print("=== ANALYSIS SUMMARY ===")
print(f"Total Records Processed: {sales_df.count():,}")
print(f"Total Revenue: ${sales_df.agg(sum('sales_amount')).collect()[0][0]:,.2f}")
print(f"Average Transaction: ${sales_df.agg(avg('sales_amount')).collect()[0][0]:,.2f}")
print(f"Unique Customers: {sales_df.select('customer_id').distinct().count()}")
print(f"Unique Products: {sales_df.select('product_id').distinct().count()}")

# Performance metrics
print("\n=== PERFORMANCE METRICS ===")
print(f"Spark Version: {spark.version}")
print(f"Default Parallelism: {spark.sparkContext.defaultParallelism}")
print(f"Application ID: {spark.sparkContext.applicationId}")

# Recommendations
print("\n=== RECOMMENDATIONS ===")
print("1. Electronics category generates highest revenue")
print("2. East region shows strong performance")
print("3. Consider targeted marketing for high-value customer segments")
print("4. Optimize inventory based on seasonal patterns")

# Cleanup
sales_df.unpersist()  # Remove from cache
print("\nCache cleared and analysis complete!")

In [None]:
# Stop Spark session
spark.stop()
print("Spark session stopped successfully!")