<a href="https://colab.research.google.com/github/Arunkumar-V01/Arunkumar-V01/blob/main/pyspark_analysis_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# E-commerce Sales Data Analysis with PySpark
# A complete data engineering project demonstrating PySpark capabilities

# Cell 1: Setup and Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("EcommerceSalesAnalysis") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")

# Cell 2: Create Sample Dataset
# Generate realistic e-commerce data
import random
from datetime import datetime, timedelta

# Create sample data
def generate_sample_data(num_records=10000):
    """Generate sample e-commerce data"""

    categories = ['Electronics', 'Clothing', 'Books', 'Home & Garden', 'Sports', 'Beauty']
    products = {
        'Electronics': ['Laptop', 'Phone', 'Tablet', 'Headphones', 'Camera'],
        'Clothing': ['Shirt', 'Jeans', 'Dress', 'Jacket', 'Shoes'],
        'Books': ['Fiction', 'Non-Fiction', 'Textbook', 'Comics', 'Biography'],
        'Home & Garden': ['Furniture', 'Decor', 'Tools', 'Plants', 'Kitchen'],
        'Sports': ['Equipment', 'Apparel', 'Footwear', 'Accessories', 'Supplements'],
        'Beauty': ['Skincare', 'Makeup', 'Haircare', 'Fragrance', 'Tools']
    }

    regions = ['North', 'South', 'East', 'West', 'Central']
    payment_methods = ['Credit Card', 'Debit Card', 'PayPal', 'Cash', 'UPI']

    data = []
    base_date = datetime(2023, 1, 1)

    for i in range(num_records):
        category = random.choice(categories)
        product = random.choice(products[category])

        # Generate realistic prices based on category
        price_ranges = {
            'Electronics': (100, 2000),
            'Clothing': (20, 200),
            'Books': (10, 50),
            'Home & Garden': (25, 500),
            'Sports': (15, 300),
            'Beauty': (5, 100)
        }

        min_price, max_price = price_ranges[category]
        # Use Python's built-in round for float
        price = __builtins__.round(random.uniform(min_price, max_price), 2)
        quantity = random.randint(1, 5)

        # Random date within the year
        random_days = random.randint(0, 364)
        order_date = base_date + timedelta(days=random_days)

        data.append({
            'order_id': f'ORD_{str(i+1).zfill(6)}',
            'customer_id': f'CUST_{random.randint(1, 2000)}',
            'product_name': f'{category}_{product}',
            'category': category,
            'price': price,
            'quantity': quantity,
            'total_amount': __builtins__.round(price * quantity, 2),
            'order_date': order_date.strftime('%Y-%m-%d'),
            'region': random.choice(regions),
            'payment_method': random.choice(payment_methods),
            'discount_percent': random.choice([0, 5, 10, 15, 20]),
            'customer_age': random.randint(18, 70),
            'is_premium_customer': random.choice([True, False])
        })

    return data

# Generate data
print("Generating sample e-commerce data...")
sample_data = generate_sample_data(10000)
print(f"Generated {len(sample_data)} sample records")

# Cell 3: Create Spark DataFrame
# Define schema for better performance
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("order_date", StringType(), True),
    StructField("region", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("discount_percent", IntegerType(), True),
    StructField("customer_age", IntegerType(), True),
    StructField("is_premium_customer", BooleanType(), True)
])

# Create Spark DataFrame
df = spark.createDataFrame(sample_data, schema)

# Convert order_date to date type
df = df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))

# Add derived columns
df = df.withColumn("discounted_amount",
                   col("total_amount") * (1 - col("discount_percent") / 100))
df = df.withColumn("month", month(col("order_date")))
df = df.withColumn("year", year(col("order_date")))

# Cache the DataFrame for better performance
df.cache()

print(f"DataFrame created with {df.count()} records")
print("\nSchema:")
df.printSchema()

# Cell 4: Basic Data Exploration
print("=== BASIC DATA EXPLORATION ===")
print(f"Total Records: {df.count()}")
print(f"Total Columns: {len(df.columns)}")

# Show sample data
print("\nSample Data:")
df.show(5, truncate=False)

# Basic statistics
print("\nBasic Statistics:")
df.select("price", "quantity", "total_amount", "customer_age").describe().show()

# Null value check
print("\nNull Value Check:")
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()

# Cell 5: Sales Analysis by Category
print("=== SALES ANALYSIS BY CATEGORY ===")

# Revenue by category
category_revenue = df.groupBy("category") \
    .agg(
        sum("discounted_amount").alias("total_revenue"),
        count("order_id").alias("order_count"),
        avg("discounted_amount").alias("avg_order_value"),
        sum("quantity").alias("total_quantity")
    ) \
    .orderBy(desc("total_revenue"))

print("Revenue by Category:")
category_revenue.show()

# Category performance with additional metrics
category_metrics = df.groupBy("category") \
    .agg(
        sum("discounted_amount").alias("revenue"),
        count("order_id").alias("orders"),
        countDistinct("customer_id").alias("unique_customers"),
        avg("price").alias("avg_price"),
        max("total_amount").alias("max_order"),
        min("total_amount").alias("min_order")
    )

print("Detailed Category Metrics:")
category_metrics.show()

# Cell 6: Time-based Analysis
print("=== TIME-BASED ANALYSIS ===")

# Monthly revenue trend
monthly_revenue = df.groupBy("year", "month") \
    .agg(sum("discounted_amount").alias("revenue")) \
    .orderBy("year", "month")

print("Monthly Revenue Trend:")
monthly_revenue.show(12)

# Daily sales pattern (day of week)
df_with_dow = df.withColumn("day_of_week", date_format(col("order_date"), "E"))

daily_pattern = df_with_dow.groupBy("day_of_week") \
    .agg(
        sum("discounted_amount").alias("revenue"),
        count("order_id").alias("order_count"),
        avg("discounted_amount").alias("avg_order_value")
    )

print("Sales Pattern by Day of Week:")
daily_pattern.show()

# Cell 7: Customer Analysis
print("=== CUSTOMER ANALYSIS ===")

# Customer segmentation by age groups
df_with_age_group = df.withColumn(
    "age_group",
    when(col("customer_age") < 25, "18-24")
    .when(col("customer_age") < 35, "25-34")
    .when(col("customer_age") < 45, "35-44")
    .when(col("customer_age") < 55, "45-54")
    .otherwise("55+")
)

age_group_analysis = df_with_age_group.groupBy("age_group") \
    .agg(
        sum("discounted_amount").alias("revenue"),
        count("order_id").alias("orders"),
        countDistinct("customer_id").alias("customers"),
        avg("discounted_amount").alias("avg_order_value")
    ) \
    .orderBy("age_group")

print("Customer Analysis by Age Group:")
age_group_analysis.show()

# Premium vs Regular customers
premium_analysis = df.groupBy("is_premium_customer") \
    .agg(
        sum("discounted_amount").alias("revenue"),
        count("order_id").alias("orders"),
        avg("discounted_amount").alias("avg_order_value"),
        avg("discount_percent").alias("avg_discount")
    )

print("Premium vs Regular Customer Analysis:")
premium_analysis.show()

# Cell 8: Regional Analysis
print("=== REGIONAL ANALYSIS ===")

regional_performance = df.groupBy("region") \
    .agg(
        sum("discounted_amount").alias("revenue"),
        count("order_id").alias("orders"),
        countDistinct("customer_id").alias("customers"),
        avg("discounted_amount").alias("avg_order_value")
    ) \
    .orderBy(desc("revenue"))

print("Regional Performance:")
regional_performance.show()

# Regional preference by category
regional_category = df.groupBy("region", "category") \
    .agg(sum("discounted_amount").alias("revenue")) \
    .orderBy("region", desc("revenue"))

print("Regional Category Preferences (Top 2 per region):")
# Show top 2 categories per region
from pyspark.sql.window import Window

window_spec = Window.partitionBy("region").orderBy(desc("revenue"))
top_categories_per_region = regional_category \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter(col("rank") <= 2) \
    .drop("rank")

top_categories_per_region.show()

# Cell 9: Advanced Analytics - Customer Lifetime Value
print("=== ADVANCED ANALYTICS ===")

# Calculate Customer Lifetime Value (CLV)
customer_metrics = df.groupBy("customer_id") \
    .agg(
        sum("discounted_amount").alias("total_spent"),
        count("order_id").alias("order_frequency"),
        avg("discounted_amount").alias("avg_order_value"),
        max("order_date").alias("last_order_date"),
        min("order_date").alias("first_order_date")
    )

# Add customer lifetime in days
customer_metrics = customer_metrics.withColumn(
    "customer_lifetime_days",
    datediff(col("last_order_date"), col("first_order_date")) + 1
)

# Calculate CLV score
customer_metrics = customer_metrics.withColumn(
    "clv_score",
    col("total_spent") * col("order_frequency") / col("customer_lifetime_days")
)

print("Top 10 Customers by CLV:")
customer_metrics.orderBy(desc("clv_score")).show(10)

# Customer segmentation based on RFM (Recency, Frequency, Monetary)
from pyspark.sql.functions import current_date

# Calculate recency (days since last order)
customer_rfm = customer_metrics.withColumn(
    "recency",
    datediff(current_date(), col("last_order_date"))
)

# Define RFM segments
customer_rfm = customer_rfm.withColumn(
    "customer_segment",
    when((col("recency") <= 30) & (col("order_frequency") >= 5) & (col("total_spent") >= 1000), "Champions")
    .when((col("recency") <= 60) & (col("order_frequency") >= 3) & (col("total_spent") >= 500), "Loyal Customers")
    .when((col("recency") <= 90) & (col("order_frequency") >= 2), "Potential Loyalists")
    .when(col("recency") <= 180, "New Customers")
    .otherwise("At Risk")
)

rfm_summary = customer_rfm.groupBy("customer_segment") \
    .agg(
        count("customer_id").alias("customer_count"),
        avg("total_spent").alias("avg_total_spent"),
        avg("order_frequency").alias("avg_order_frequency"),
        avg("recency").alias("avg_recency")
    )

print("Customer Segmentation (RFM Analysis):")
rfm_summary.show()

# Cell 10: Product Performance Analysis
print("=== PRODUCT PERFORMANCE ANALYSIS ===")

# Product popularity and profitability
product_performance = df.groupBy("product_name", "category") \
    .agg(
        sum("quantity").alias("total_sold"),
        sum("discounted_amount").alias("revenue"),
        avg("price").alias("avg_price"),
        count("order_id").alias("order_count"),
        avg("discount_percent").alias("avg_discount")
    ) \
    .withColumn("revenue_per_unit", col("revenue") / col("total_sold"))

print("Top 10 Products by Revenue:")
product_performance.orderBy(desc("revenue")).show(10)

print("Top 10 Products by Quantity Sold:")
product_performance.orderBy(desc("total_sold")).show(10)

# Cell 11: Payment Method Analysis
print("=== PAYMENT METHOD ANALYSIS ===")

payment_analysis = df.groupBy("payment_method") \
    .agg(
        sum("discounted_amount").alias("revenue"),
        count("order_id").alias("transaction_count"),
        avg("discounted_amount").alias("avg_transaction_value"),
        avg("discount_percent").alias("avg_discount_used")
    ) \
    .orderBy(desc("revenue"))

print("Payment Method Performance:")
payment_analysis.show()

# Payment method preference by age group
payment_age_analysis = df_with_age_group.groupBy("age_group", "payment_method") \
    .agg(count("order_id").alias("usage_count")) \
    .orderBy("age_group", desc("usage_count"))

print("Payment Method Preference by Age Group:")
payment_age_analysis.show()

# Cell 12: Create Summary Report
print("=== EXECUTIVE SUMMARY REPORT ===")

# Overall business metrics
total_revenue = df.agg(sum("discounted_amount")).collect()[0][0]
total_orders = df.count()
unique_customers = df.select("customer_id").distinct().count()
avg_order_value = df.agg(avg("discounted_amount")).collect()[0][0]

print(f"""
BUSINESS PERFORMANCE SUMMARY
========================================
Total Revenue: ${total_revenue:,.2f}
Total Orders: {total_orders:,}
Unique Customers: {unique_customers:,}
Average Order Value: ${avg_order_value:.2f}
Revenue per Customer: ${total_revenue/unique_customers:.2f}
Orders per Customer: {total_orders/unique_customers:.1f}
""")

# Top performers summary
print("TOP PERFORMERS:")
print("-" * 40)

# Top category
top_category = category_revenue.first()
print(f"Best Category: {top_category['category']} (${top_category['total_revenue']:,.2f})")

# Top region
top_region = regional_performance.first()
print(f"Best Region: {top_region['region']} (${top_region['revenue']:,.2f})")

# Best month
best_month = monthly_revenue.orderBy(desc("revenue")).first()
print(f"Best Month: {best_month['year']}-{best_month['month']:02d} (${best_month['revenue']:,.2f})")

# Cell 13: Data Quality and Validation
print("=== DATA QUALITY VALIDATION ===")

# Data quality checks
print("Data Quality Checks:")
print(f"1. No duplicate order IDs: {df.select('order_id').distinct().count() == df.count()}")
print(f"2. All prices > 0: {df.filter(col('price') <= 0).count() == 0}")
print(f"3. All quantities > 0: {df.filter(col('quantity') <= 0).count() == 0}")
print(f"4. All dates valid: {df.filter(col('order_date').isNull()).count() == 0}")
print(f"5. Customer age in valid range: {df.filter((col('customer_age') < 18) | (col('customer_age') > 100)).count() == 0}")

# Statistical validation
print("\nStatistical Summary:")
df.select("price", "quantity", "total_amount", "discount_percent").summary().show()

# Cell 14: Save Results (Optional)
print("=== SAVING RESULTS ===")

# Save key results to CSV for further analysis
# Convert to Pandas for easy saving (small datasets only)
category_revenue_pd = category_revenue.toPandas()
monthly_revenue_pd = monthly_revenue.toPandas()
regional_performance_pd = regional_performance.toPandas()

# Save to CSV files (optional - uncomment if needed)
# category_revenue_pd.to_csv('category_revenue.csv', index=False)
# monthly_revenue_pd.to_csv('monthly_revenue.csv', index=False)
# regional_performance_pd.to_csv('regional_performance.csv', index=False)

print("Analysis completed! Key insights:")
print("1. Category performance varies significantly")
print("2. Clear seasonal patterns in sales")
print("3. Customer segments show different behaviors")
print("4. Regional preferences differ by product category")
print("5. Payment methods correlate with customer age groups")

# Cell 15: Cleanup
print("=== CLEANUP ===")

# Stop Spark session
# spark.stop()  # Uncomment when completely done

print("Analysis complete! Spark session is still active for further exploration.")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Spark Version: 3.5.1
Spark UI available at: http://45c9ced5cb98:4040
Generating sample e-commerce data...
Generated 10000 sample records
DataFrame created with 10000 records

Schema:
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- region: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- discount_percent: integer (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- is_premium_customer: boolean (nullable = true)
 |-- discounted_amount: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)

=== BASIC DATA EXPLORATION ===
Total Records: 10000
Total Columns: 16

Sample Data:
+----------+-----------+------------------+-