# E-commerce Transaction Data Analysis with PySpark

## DS551 - Kafka + Spark Assignment

### Objective
Perform Exploratory Data Analysis (EDA) on transaction data streamed through Kafka using PySpark.

### Why PySpark instead of Pandas?
- **Pandas**: Loads entire dataset into memory ‚Üí struggles with large files (1M+ rows)
- **PySpark**: Distributed processing ‚Üí handles massive datasets efficiently
- **Lazy Evaluation**: PySpark optimizes operations before execution

---

## 1. Setup and Imports

In [None]:
# Import PySpark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, count, sum as spark_sum, avg, max as spark_max, min as spark_min,
    hour, dayofweek, to_timestamp, desc
)
from pyspark.sql.types import DoubleType

# Import visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns

# Set plot style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)

print("‚úÖ Imports successful!")

## 2. Create Spark Session

**SparkSession**: Entry point to PySpark functionality. Think of it as the "connection" to Spark.

In [None]:
# Create Spark Session
spark = SparkSession.builder \
    .appName("Transaction Data EDA") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

# Check Spark version
print(f"‚úÖ Spark Session created!")
print(f"üìä Spark Version: {spark.version}")
print(f"üîó Spark UI: http://localhost:4040")

## 3. Load Data from CSV

Loading data that came through Kafka and was saved by the consumer.

In [None]:
# Load data into Spark DataFrame
df = spark.read.csv(
    "/home/jovyan/data/transactions.csv",  # Path inside container
    header=True,
    inferSchema=True  # Automatically detect data types
)

print(f"‚úÖ Data loaded successfully!")
print(f"üìä Total records: {df.count():,}")

## 4. Data Exploration

### 4.1 Schema (Column Types)

In [None]:
# Display schema
print("üìã Data Schema:")
df.printSchema()

### 4.2 Sample Data

In [None]:
# Show first 10 rows
print("üìÑ Sample Data (first 10 rows):")
df.show(10, truncate=False)

### 4.3 Basic Statistics

In [None]:
# Descriptive statistics for numeric columns
print("üìà Statistical Summary:")
df.select("product_price", "quantity", "total_amount").describe().show()

### 4.4 Check for Null Values

In [None]:
# Count null values in each column
from pyspark.sql.functions import col, sum as spark_sum, when, isnan, isnull

null_counts = df.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
])

print("üîç Null Value Counts:")
null_counts.show()

## 5. Data Analysis

### 5.1 Sales by Product Category

In [None]:
# Group by product category and calculate total sales
category_sales = df.groupBy("product_category") \
    .agg(
        count("transaction_id").alias("num_transactions"),
        spark_sum("total_amount").alias("total_revenue")
    ) \
    .orderBy(desc("total_revenue"))

print("üí∞ Sales by Product Category:")
category_sales.show()

### 5.2 Sales by Payment Method

In [None]:
# Analyze payment methods
payment_analysis = df.groupBy("payment_method") \
    .agg(
        count("*").alias("num_transactions"),
        avg("total_amount").alias("avg_transaction")
    ) \
    .orderBy(desc("num_transactions"))

print("üí≥ Payment Method Analysis:")
payment_analysis.show()

### 5.3 Top 10 Cities by Revenue

In [None]:
# Top cities by total revenue
city_revenue = df.groupBy("city") \
    .agg(
        count("*").alias("num_transactions"),
        spark_sum("total_amount").alias("total_revenue")
    ) \
    .orderBy(desc("total_revenue")) \
    .limit(10)

print("üèôÔ∏è Top 10 Cities by Revenue:")
city_revenue.show()

### 5.4 Average Transaction Amount by Category

In [None]:
# Average transaction amount per category
avg_transaction = df.groupBy("product_category") \
    .agg(
        avg("total_amount").alias("avg_amount"),
        spark_min("total_amount").alias("min_amount"),
        spark_max("total_amount").alias("max_amount")
    ) \
    .orderBy(desc("avg_amount"))

print("üìä Average Transaction by Category:")
avg_transaction.show()

## 6. Data Visualizations with Matplotlib

### 6.1 Total Revenue by Product Category (Bar Chart)

In [None]:
# Convert Spark DataFrame to Pandas for plotting
category_sales_pd = category_sales.toPandas()

# Create bar chart
plt.figure(figsize=(12, 6))
plt.bar(category_sales_pd['product_category'], 
        category_sales_pd['total_revenue'],
        color='steelblue')
plt.xlabel('Product Category', fontsize=12)
plt.ylabel('Total Revenue ($)', fontsize=12)
plt.title('Total Revenue by Product Category', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

print("‚úÖ Bar chart created!")

### 6.2 Transaction Amount Distribution (Histogram)

In [None]:
# Sample data for histogram (avoid memory issues)
sample_amounts = df.select("total_amount").sample(False, 0.1).toPandas()

plt.figure(figsize=(12, 6))
plt.hist(sample_amounts['total_amount'], bins=50, color='coral', edgecolor='black')
plt.xlabel('Transaction Amount ($)', fontsize=12)
plt.ylabel('Frequency', fontsize=12)
plt.title('Distribution of Transaction Amounts', fontsize=14, fontweight='bold')
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

print("‚úÖ Histogram created!")

### 6.3 Payment Method Distribution (Pie Chart)

In [None]:
# Convert payment method data to Pandas
payment_pd = payment_analysis.toPandas()

# Create pie chart
plt.figure(figsize=(10, 8))
colors = ['#ff9999', '#66b3ff', '#99ff99', '#ffcc99', '#ff99cc']
plt.pie(payment_pd['num_transactions'], 
        labels=payment_pd['payment_method'],
        autopct='%1.1f%%',
        colors=colors,
        startangle=90)
plt.title('Payment Method Distribution', fontsize=14, fontweight='bold')
plt.axis('equal')
plt.tight_layout()
plt.show()

print("‚úÖ Pie chart created!")

### 6.4 Top 10 Cities by Revenue (Horizontal Bar Chart)

In [None]:
# Convert city revenue data to Pandas
city_revenue_pd = city_revenue.toPandas()

# Create horizontal bar chart
plt.figure(figsize=(12, 8))
plt.barh(city_revenue_pd['city'], 
         city_revenue_pd['total_revenue'],
         color='mediumseagreen')
plt.xlabel('Total Revenue ($)', fontsize=12)
plt.ylabel('City', fontsize=12)
plt.title('Top 10 Cities by Revenue', fontsize=14, fontweight='bold')
plt.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()

print("‚úÖ Horizontal bar chart created!")

### 6.5 Number of Transactions by Category (Bar Chart)

In [None]:
# Create transaction count bar chart
plt.figure(figsize=(12, 6))
plt.bar(category_sales_pd['product_category'], 
        category_sales_pd['num_transactions'],
        color='mediumpurple')
plt.xlabel('Product Category', fontsize=12)
plt.ylabel('Number of Transactions', fontsize=12)
plt.title('Number of Transactions by Product Category', fontsize=14, fontweight='bold')
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

print("‚úÖ Transaction count chart created!")

## 7. Key Insights and Summary

In [None]:
# Calculate overall metrics
total_records = df.count()
total_revenue = df.agg(spark_sum("total_amount")).collect()[0][0]
avg_transaction = df.agg(avg("total_amount")).collect()[0][0]
unique_customers = df.select("customer_id").distinct().count()

print("=" * 80)
print("üìä ANALYSIS SUMMARY")
print("=" * 80)
print(f"üìÑ Total Transactions: {total_records:,}")
print(f"üí∞ Total Revenue: ${total_revenue:,.2f}")
print(f"üíµ Average Transaction: ${avg_transaction:,.2f}")
print(f"üë• Unique Customers: {unique_customers:,}")
print("=" * 80)

# Top category
top_category = category_sales.first()
print(f"\nüèÜ Top Category: {top_category['product_category']}")
print(f"   Revenue: ${top_category['total_revenue']:,.2f}")
print(f"   Transactions: {top_category['num_transactions']:,}")

# Most popular payment method
top_payment = payment_analysis.first()
print(f"\nüí≥ Most Popular Payment Method: {top_payment['payment_method']}")
print(f"   Transactions: {top_payment['num_transactions']:,}")

print("\n‚úÖ EDA Complete!")

## 8. Cleanup

In [None]:
# Stop Spark session (optional - good practice)
# spark.stop()
print("‚úÖ Notebook execution complete!")

---

## Conclusion

This notebook demonstrated:
1. ‚úÖ Loading large datasets with PySpark
2. ‚úÖ Performing aggregations and transformations
3. ‚úÖ Creating visualizations with matplotlib
4. ‚úÖ Analyzing e-commerce transaction patterns

**Key Takeaways:**
- PySpark handles large datasets efficiently through distributed processing
- `.toPandas()` converts Spark DataFrames to Pandas for visualization
- Lazy evaluation means operations are optimized before execution

**Next Steps:**
- Deploy to OpenShift/Kubernetes
- Integrate with real-time Kafka streaming
- Add more advanced analytics

---
**Author**: DS551 Student  
**Date**: 2025-11-14  
**Assignment**: Kafka + PySpark EDA
