In [None]:
# ========== CELL 1: Initialize PySpark ==========
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Create or get SparkSession
spark = SparkSession.builder \
    .appName("PySpark-Jupyter-Tutorial") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark UI: http://localhost:4040")

In [None]:
# ========== CELL 2: Test HDFS Connection ==========
# List HDFS root directory
import subprocess
result = subprocess.run(['hdfs', 'dfs', '-ls', '/'], capture_output=True, text=True)
print("HDFS Root Directory:")
print(result.stdout)

In [None]:
# ========== CELL 3: Create Sample Dataset ==========
# Generate sample e-commerce data
from datetime import datetime, timedelta
import random

# Generate dates
start_date = datetime(2024, 1, 1)
dates = [start_date + timedelta(days=x) for x in range(90)]

# Generate sample transactions
transactions = []
products = ['Laptop', 'Phone', 'Tablet', 'Headphones', 'Watch']
categories = ['Electronics', 'Electronics', 'Electronics', 'Audio', 'Wearables']
customers = [f'Customer_{i}' for i in range(1, 101)]

for _ in range(10000):
    date = random.choice(dates)
    product_idx = random.randint(0, len(products)-1)
    transactions.append({
        'date': date.strftime('%Y-%m-%d'),
        'customer_id': random.choice(customers),
        'product': products[product_idx],
        'category': categories[product_idx],
        'quantity': random.randint(1, 5),
        'price': random.uniform(50, 2000),
        'discount': random.uniform(0, 0.3)
    })

# Create DataFrame
df = spark.createDataFrame(transactions)
df.show(5)
print(f"Total records: {df.count()}")

In [None]:
# ========== CELL 4: Save to HDFS ==========
# Save DataFrame to HDFS in different formats
hdfs_base = "hdfs://namenode:9000/jupyter-data"

# Parquet format (recommended)
df.write.mode("overwrite").parquet(f"{hdfs_base}/transactions.parquet")
print("✓ Saved as Parquet")

# CSV format
df.write.mode("overwrite").option("header", "true").csv(f"{hdfs_base}/transactions.csv")
print("✓ Saved as CSV")

# JSON format
df.write.mode("overwrite").json(f"{hdfs_base}/transactions.json")
print("✓ Saved as JSON")

In [None]:
# ========== CELL 5: Read from HDFS ==========
# Read back from HDFS
df_parquet = spark.read.parquet(f"{hdfs_base}/transactions.parquet")
df_csv = spark.read.option("header", "true").csv(f"{hdfs_base}/transactions.csv")
df_json = spark.read.json(f"{hdfs_base}/transactions.json")

print(f"Parquet records: {df_parquet.count()}")
print(f"CSV records: {df_csv.count()}")
print(f"JSON records: {df_json.count()}")

In [None]:
# ========== CELL 6: Data Analysis with SQL ==========
# Register as SQL table
df.createOrReplaceTempView("transactions")

# SQL queries
query1 = """
SELECT 
    category,
    COUNT(*) as total_transactions,
    SUM(quantity) as total_quantity,
    ROUND(AVG(price), 2) as avg_price,
    ROUND(SUM(price * quantity * (1 - discount)), 2) as total_revenue
FROM transactions
GROUP BY category
ORDER BY total_revenue DESC
"""

category_summary = spark.sql(query1)
category_summary.show()

# Convert to Pandas for visualization
category_pd = category_summary.toPandas()

In [None]:
# ========== CELL 7: Visualization ==========
# Set style
plt.style.use('seaborn-v0_8-darkgrid')
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Plot 1: Revenue by Category
axes[0, 0].bar(category_pd['category'], category_pd['total_revenue'])
axes[0, 0].set_title('Total Revenue by Category')
axes[0, 0].set_xlabel('Category')
axes[0, 0].set_ylabel('Revenue ($)')

# Plot 2: Average Price by Category
axes[0, 1].bar(category_pd['category'], category_pd['avg_price'], color='orange')
axes[0, 1].set_title('Average Price by Category')
axes[0, 1].set_xlabel('Category')
axes[0, 1].set_ylabel('Avg Price ($)')

# Plot 3: Transactions by Category
axes[1, 0].pie(category_pd['total_transactions'], labels=category_pd['category'], autopct='%1.1f%%')
axes[1, 0].set_title('Transaction Distribution')

# Plot 4: Quantity Sold
axes[1, 1].bar(category_pd['category'], category_pd['total_quantity'], color='green')
axes[1, 1].set_title('Total Quantity Sold')
axes[1, 1].set_xlabel('Category')
axes[1, 1].set_ylabel('Quantity')

plt.tight_layout()
plt.show()

In [None]:
# ========== CELL 8: Time Series Analysis ==========
# Monthly sales trend
monthly_sales = spark.sql("""
SELECT 
    DATE_FORMAT(date, 'yyyy-MM') as month,
    COUNT(*) as transactions,
    ROUND(SUM(price * quantity * (1 - discount)), 2) as revenue
FROM transactions
GROUP BY DATE_FORMAT(date, 'yyyy-MM')
ORDER BY month
""")

monthly_pd = monthly_sales.toPandas()

# Plot time series
plt.figure(figsize=(12, 5))
plt.plot(monthly_pd['month'], monthly_pd['revenue'], marker='o')
plt.title('Monthly Revenue Trend')
plt.xlabel('Month')
plt.ylabel('Revenue ($)')
plt.xticks(rotation=45)
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# ========== CELL 9: Customer Analysis ==========
# Top customers
top_customers = spark.sql("""
SELECT 
    customer_id,
    COUNT(*) as purchase_count,
    ROUND(SUM(price * quantity * (1 - discount)), 2) as total_spent,
    ROUND(AVG(price * quantity * (1 - discount)), 2) as avg_order_value
FROM transactions
GROUP BY customer_id
ORDER BY total_spent DESC
LIMIT 10
""")

print("Top 10 Customers by Revenue:")
top_customers.show()

In [None]:
# ========== CELL 10: Machine Learning - Customer Segmentation ==========
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Prepare customer features
customer_features = spark.sql("""
SELECT 
    customer_id,
    COUNT(*) as purchase_frequency,
    AVG(price * quantity) as avg_order_value,
    SUM(price * quantity * (1 - discount)) as total_spent,
    AVG(discount) as avg_discount_used
FROM transactions
GROUP BY customer_id
""")

# Prepare features for ML
assembler = VectorAssembler(
    inputCols=["purchase_frequency", "avg_order_value", "total_spent", "avg_discount_used"],
    outputCol="features"
)

feature_df = assembler.transform(customer_features)

# K-Means clustering
kmeans = KMeans(k=3, seed=42)
model = kmeans.fit(feature_df)

# Make predictions
predictions = model.transform(feature_df)

# Show cluster assignments
print("Customer Segments:")
predictions.select("customer_id", "prediction").show(10)

# Evaluate clustering
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

In [None]:
# ========== CELL 11: Save ML Model to HDFS ==========
# Save model
model_path = "hdfs://namenode:9000/models/customer-segmentation"
model.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

# Load model
from pyspark.ml.clustering import KMeansModel
loaded_model = KMeansModel.load(model_path)
print("Model loaded successfully!")

# ========== CELL 12: Clean Up ==========
# Stop Spark session when done
# spark.stop()
print("Remember to stop Spark session when finished!")