In [2]:
# Spark + PostgreSQL Analytics Examples
# Save this as: spark/notebooks/spark_analytics_examples.ipynb

# ═══════════════════════════════════════════════════════════════════════════════
# SETUP & CONNECTION
# ═══════════════════════════════════════════════════════════════════════════════

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import pandas as pd

# Initialize Spark with PostgreSQL connector
spark = SparkSession.builder \
    .appName("E-Commerce Analytics") \
    .config("spark.jars", "/opt/spark/jars/postgresql-42.7.5.jar") \
    .getOrCreate()

# PostgreSQL connection properties
pg_properties = {
    "user": "sparkuser",
    "password": "sparkpass", 
    "driver": "org.postgresql.Driver",
    "url": "jdbc:postgresql://postgres:5432/demo"
}

print("✅ Spark session created successfully!")
print(f"Spark version: {spark.version}")

# ═══════════════════════════════════════════════════════════════════════════════
# DATA LOADING
# ═══════════════════════════════════════════════════════════════════════════════

# Load main tables into Spark DataFrames
customers_df = spark.read.jdbc(**pg_properties, table="customers")
orders_df = spark.read.jdbc(**pg_properties, table="orders")
order_items_df = spark.read.jdbc(**pg_properties, table="order_items")
products_df = spark.read.jdbc(**pg_properties, table="products")
categories_df = spark.read.jdbc(**pg_properties, table="categories")
reviews_df = spark.read.jdbc(**pg_properties, table="reviews")

print("📊 Data loaded successfully!")
print(f"Customers: {customers_df.count():,}")
print(f"Orders: {orders_df.count():,}")
print(f"Order Items: {order_items_df.count():,}")
print(f"Products: {products_df.count():,}")
print(f"Reviews: {reviews_df.count():,}")

# ═══════════════════════════════════════════════════════════════════════════════
# EXAMPLE 1: CUSTOMER ANALYTICS
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("EXAMPLE 1: CUSTOMER LIFETIME VALUE ANALYSIS")
print("="*60)

# Customer LTV analysis with Spark SQL
customers_df.createOrReplaceTempView("customers")
orders_df.createOrReplaceTempView("orders")

customer_ltv = spark.sql("""
    SELECT 
        c.customer_tier,
        COUNT(DISTINCT c.id) as customer_count,
        AVG(c.total_lifetime_value) as avg_ltv,
        SUM(c.total_lifetime_value) as total_ltv,
        AVG(DATEDIFF(CURRENT_DATE(), c.registration_date)) as avg_days_since_registration
    FROM customers c
    WHERE c.total_lifetime_value > 0
    GROUP BY c.customer_tier
    ORDER BY avg_ltv DESC
""")

customer_ltv.show()

# Convert to pandas for visualization
ltv_pandas = customer_ltv.toPandas()

plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.bar(ltv_pandas['customer_tier'], ltv_pandas['avg_ltv'])
plt.title('Average LTV by Customer Tier')
plt.ylabel('Average LTV ($)')
plt.xticks(rotation=45)

plt.subplot(1, 2, 2)
plt.pie(ltv_pandas['customer_count'], labels=ltv_pandas['customer_tier'], autopct='%1.1f%%')
plt.title('Customer Distribution by Tier')

plt.tight_layout()
plt.show()

# ═══════════════════════════════════════════════════════════════════════════════
# EXAMPLE 2: SALES TREND ANALYSIS
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("EXAMPLE 2: MONTHLY SALES TREND ANALYSIS")
print("="*60)

# Monthly sales trends
monthly_sales = orders_df.filter(col("payment_status") == "paid") \
    .withColumn("year_month", date_format(col("order_date"), "yyyy-MM")) \
    .groupBy("year_month") \
    .agg(
        count("*").alias("total_orders"),
        countDistinct("customer_id").alias("unique_customers"),
        sum("total_amount").alias("total_revenue"),
        avg("total_amount").alias("avg_order_value")
    ) \
    .orderBy("year_month")

monthly_sales.show(20)

# Plot sales trends
sales_pandas = monthly_sales.toPandas()
sales_pandas['year_month'] = pd.to_datetime(sales_pandas['year_month'])

plt.figure(figsize=(15, 10))

plt.subplot(2, 2, 1)
plt.plot(sales_pandas['year_month'], sales_pandas['total_revenue'])
plt.title('Monthly Revenue Trend')
plt.ylabel('Revenue ($)')
plt.xticks(rotation=45)

plt.subplot(2, 2, 2)
plt.plot(sales_pandas['year_month'], sales_pandas['total_orders'])
plt.title('Monthly Order Volume')
plt.ylabel('Number of Orders')
plt.xticks(rotation=45)

plt.subplot(2, 2, 3)
plt.plot(sales_pandas['year_month'], sales_pandas['avg_order_value'])
plt.title('Average Order Value Trend')
plt.ylabel('AOV ($)')
plt.xticks(rotation=45)

plt.subplot(2, 2, 4)
plt.plot(sales_pandas['year_month'], sales_pandas['unique_customers'])
plt.title('Monthly Unique Customers')
plt.ylabel('Unique Customers')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

# ═══════════════════════════════════════════════════════════════════════════════
# EXAMPLE 3: PRODUCT PERFORMANCE ANALYSIS
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("EXAMPLE 3: TOP PERFORMING PRODUCTS")
print("="*60)

# Create temp views for complex analysis
products_df.createOrReplaceTempView("products")
order_items_df.createOrReplaceTempView("order_items")
categories_df.createOrReplaceTempView("categories")
reviews_df.createOrReplaceTempView("reviews")

# Top products by revenue with category information
top_products = spark.sql("""
    SELECT 
        p.name as product_name,
        c.name as category_name,
        p.brand,
        p.price,
        COUNT(oi.id) as total_orders,
        SUM(oi.quantity) as units_sold,
        SUM(oi.line_total) as total_revenue,
        AVG(r.rating) as avg_rating,
        COUNT(r.id) as review_count
    FROM products p
    LEFT JOIN categories c ON p.category_id = c.id
    LEFT JOIN order_items oi ON p.id = oi.product_id
    LEFT JOIN orders o ON oi.order_id = o.id AND o.payment_status = 'paid'
    LEFT JOIN reviews r ON p.id = r.product_id
    GROUP BY p.id, p.name, c.name, p.brand, p.price
    HAVING SUM(oi.line_total) IS NOT NULL
    ORDER BY total_revenue DESC
    LIMIT 20
""")

top_products.show(truncate=False)

# Category performance analysis
category_performance = spark.sql("""
    SELECT 
        c.name as category_name,
        COUNT(DISTINCT p.id) as product_count,
        COUNT(oi.id) as total_orders,
        SUM(oi.quantity) as units_sold,
        SUM(oi.line_total) as category_revenue,
        AVG(oi.unit_price) as avg_price,
        AVG(r.rating) as avg_rating
    FROM categories c
    LEFT JOIN products p ON c.id = p.category_id
    LEFT JOIN order_items oi ON p.id = oi.product_id
    LEFT JOIN orders o ON oi.order_id = o.id AND o.payment_status = 'paid'
    LEFT JOIN reviews r ON p.id = r.product_id
    WHERE c.parent_category_id IS NULL  -- Only top-level categories
    GROUP BY c.id, c.name
    HAVING SUM(oi.line_total) IS NOT NULL
    ORDER BY category_revenue DESC
""")

category_performance.show()

# Visualize category performance
cat_pandas = category_performance.toPandas()

plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.bar(range(len(cat_pandas)), cat_pandas['category_revenue'])
plt.title('Revenue by Category')
plt.ylabel('Revenue ($)')
plt.xticks(range(len(cat_pandas)), cat_pandas['category_name'], rotation=45)

plt.subplot(1, 3, 2)
plt.bar(range(len(cat_pandas)), cat_pandas['units_sold'])
plt.title('Units Sold by Category')
plt.ylabel('Units Sold')
plt.xticks(range(len(cat_pandas)), cat_pandas['category_name'], rotation=45)

plt.subplot(1, 3, 3)
plt.bar(range(len(cat_pandas)), cat_pandas['avg_rating'])
plt.title('Average Rating by Category')
plt.ylabel('Average Rating')
plt.xticks(range(len(cat_pandas)), cat_pandas['category_name'], rotation=45)
plt.ylim(0, 5)

plt.tight_layout()
plt.show()

# ═══════════════════════════════════════════════════════════════════════════════
# EXAMPLE 4: CUSTOMER SEGMENTATION WITH SPARK ML
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("EXAMPLE 4: CUSTOMER SEGMENTATION USING RFM ANALYSIS")
print("="*60)

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# RFM Analysis (Recency, Frequency, Monetary)
# First, let's create RFM features for each customer
current_date = lit("2025-01-01").cast("date")

rfm_data = spark.sql("""
    SELECT 
        c.id as customer_id,
        c.first_name || ' ' || c.last_name as customer_name,
        c.customer_tier,
        DATEDIFF(CURRENT_DATE(), MAX(o.order_date)) as recency_days,
        COUNT(o.id) as frequency,
        SUM(o.total_amount) as monetary_value,
        AVG(o.total_amount) as avg_order_value
    FROM customers c
    LEFT JOIN orders o ON c.id = o.customer_id AND o.payment_status = 'paid'
    WHERE o.id IS NOT NULL  -- Only customers with orders
    GROUP BY c.id, c.first_name, c.last_name, c.customer_tier
    HAVING COUNT(o.id) > 0
""")

rfm_data.show(20)

# Prepare features for clustering
assembler = VectorAssembler(
    inputCols=["recency_days", "frequency", "monetary_value"],
    outputCol="features"
)

rfm_features = assembler.transform(rfm_data)

# Apply K-means clustering
kmeans = KMeans(k=4, seed=42, featuresCol="features", predictionCol="segment")
model = kmeans.fit(rfm_features)
segmented_customers = model.transform(rfm_features)

# Analyze segments
segment_analysis = segmented_customers.groupBy("segment") \
    .agg(
        count("*").alias("customer_count"),
        avg("recency_days").alias("avg_recency"),
        avg("frequency").alias("avg_frequency"),
        avg("monetary_value").alias("avg_monetary"),
        avg("avg_order_value").alias("avg_order_value")
    ) \
    .orderBy("segment")

print("Customer Segments Analysis:")
segment_analysis.show()

# ═══════════════════════════════════════════════════════════════════════════════
# EXAMPLE 5: REAL-TIME STREAMING SIMULATION
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("EXAMPLE 5: ADVANCED WINDOW FUNCTIONS & ANALYTICS")
print("="*60)

# Customer purchase patterns using window functions
from pyspark.sql.window import Window

# Analyze customer purchasing patterns
customer_window = Window.partitionBy("customer_id").orderBy("order_date")

customer_patterns = orders_df.filter(col("payment_status") == "paid") \
    .withColumn("order_number", row_number().over(customer_window)) \
    .withColumn("prev_order_date", lag("order_date").over(customer_window)) \
    .withColumn("days_between_orders", 
                datediff(col("order_date"), col("prev_order_date"))) \
    .withColumn("running_total", 
                sum("total_amount").over(customer_window.rowsBetween(Window.unboundedPreceding, Window.currentRow)))

# Customer loyalty analysis
loyalty_analysis = customer_patterns.filter(col("order_number") > 1) \
    .groupBy("customer_id") \
    .agg(
        max("order_number").alias("total_orders"),
        avg("days_between_orders").alias("avg_days_between_orders"),
        max("running_total").alias("lifetime_value"),
        stddev("total_amount").alias("order_value_consistency")
    ) \
    .filter(col("total_orders") >= 3)  # Customers with 3+ orders

print("Customer Loyalty Metrics:")
loyalty_analysis.show(20)

# ═══════════════════════════════════════════════════════════════════════════════
# EXAMPLE 6: INVENTORY & STOCK ANALYSIS
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("EXAMPLE 6: INVENTORY OPTIMIZATION ANALYSIS")
print("="*60)

# Products that need restocking
low_stock_products = products_df.filter(col("stock_quantity") <= col("reorder_level")) \
    .select("sku", "name", "stock_quantity", "reorder_level", "category_id") \
    .join(categories_df, products_df.category_id == categories_df.id) \
    .select("sku", "name", "stock_quantity", "reorder_level", categories_df.name.alias("category"))

print("Products Needing Restock:")
low_stock_products.show(truncate=False)

# Sales velocity analysis
sales_velocity = spark.sql("""
    SELECT 
        p.sku,
        p.name as product_name,
        p.stock_quantity,
        p.reorder_level,
        SUM(oi.quantity) as total_sold_90days,
        SUM(oi.quantity) / 90.0 as daily_sales_rate,
        p.stock_quantity / (SUM(oi.quantity) / 90.0) as days_of_inventory
    FROM products p
    LEFT JOIN order_items oi ON p.id = oi.product_id
    LEFT JOIN orders o ON oi.order_id = o.id 
    WHERE o.payment_status = 'paid' 
    AND o.order_date >= DATE_SUB(CURRENT_DATE(), 90)
    GROUP BY p.id, p.sku, p.name, p.stock_quantity, p.reorder_level
    HAVING SUM(oi.quantity) > 0
    ORDER BY days_of_inventory ASC
""")

print("Sales Velocity Analysis (Products with lowest days of inventory):")
sales_velocity.show(20)

# ═══════════════════════════════════════════════════════════════════════════════
# CLEANUP
# ═══════════════════════════════════════════════════════════════════════════════

print("\n" + "="*60)
print("🎉 ANALYSIS COMPLETE!")
print("="*60)
print("Key insights generated:")
print("✅ Customer LTV analysis by tier")
print("✅ Monthly sales trends and patterns")
print("✅ Product and category performance")
print("✅ Customer segmentation using RFM analysis")
print("✅ Customer loyalty and purchasing patterns")
print("✅ Inventory optimization recommendations")
print("\nYou can now explore the data further or create your own analyses!")

# Don't stop the Spark session - leave it running for interactive use
# spark.stop()

ModuleNotFoundError: No module named 'pyspark'