# Big Data Sandbox - Getting Started Tutorial

Welcome to the Big Data Sandbox! This notebook will guide you through the basic operations with our integrated big data tools.

## What you'll learn:
1. Connecting to Spark
2. Reading data from MinIO (S3)
3. Processing data with PySpark
4. Writing to Kafka
5. Orchestrating with Airflow

## 1. Setup and Imports

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import json

# Set up plotting
plt.style.use('seaborn-v0_8')
%matplotlib inline

## 2. Initialize Spark Session with MinIO Support

In [None]:
# Create Spark session with MinIO configuration
spark = SparkSession.builder \
    .appName("BigDataSandboxTutorial") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Spark UI: http://localhost:4040")

## 3. Generate Sample Data

In [None]:
# Generate sample sales data
import random
from datetime import datetime, timedelta

def generate_sales_data(num_records=1000):
    """Generate sample sales data"""
    
    products = ['Laptop', 'Phone', 'Tablet', 'Watch', 'Headphones', 'Camera', 'Speaker', 'Monitor']
    categories = ['Electronics', 'Audio', 'Computing', 'Mobile']
    regions = ['North', 'South', 'East', 'West', 'Central']
    
    data = []
    start_date = datetime.now() - timedelta(days=90)
    
    for i in range(num_records):
        date = start_date + timedelta(days=random.randint(0, 90))
        data.append({
            'transaction_id': f'TRX{i:06d}',
            'date': date.strftime('%Y-%m-%d'),
            'customer_id': f'CUST{random.randint(1, 200):04d}',
            'product_id': f'PROD{random.randint(1, 8):03d}',
            'product_name': random.choice(products),
            'quantity': random.randint(1, 5),
            'price': round(random.uniform(50, 2000), 2),
            'category': random.choice(categories),
            'region': random.choice(regions)
        })
    
    return data

# Generate and create DataFrame
sales_data = generate_sales_data(1000)
sales_df = spark.createDataFrame(sales_data)

print(f"Generated {sales_df.count()} records")
sales_df.show(5)

## 4. Data Processing with PySpark

In [None]:
# Add calculated columns
sales_df = sales_df.withColumn("total_amount", col("quantity") * col("price"))
sales_df = sales_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
sales_df = sales_df.withColumn("year", year("date"))
sales_df = sales_df.withColumn("month", month("date"))

# Register as temp view for SQL queries
sales_df.createOrReplaceTempView("sales")

# SQL query example
top_products = spark.sql("""
    SELECT 
        product_name,
        COUNT(*) as transaction_count,
        SUM(total_amount) as total_revenue,
        AVG(total_amount) as avg_transaction_value
    FROM sales
    GROUP BY product_name
    ORDER BY total_revenue DESC
    LIMIT 5
""")

print("Top 5 Products by Revenue:")
top_products.show()

## 5. Advanced Analytics

In [None]:
# Time series analysis
daily_sales = sales_df.groupBy("date") \
    .agg(
        sum("total_amount").alias("daily_revenue"),
        count("transaction_id").alias("transaction_count"),
        countDistinct("customer_id").alias("unique_customers")
    ) \
    .orderBy("date")

# Convert to Pandas for visualization
daily_sales_pd = daily_sales.toPandas()

# Plot daily revenue
fig, axes = plt.subplots(2, 1, figsize=(12, 8))

axes[0].plot(daily_sales_pd['date'], daily_sales_pd['daily_revenue'], marker='o')
axes[0].set_title('Daily Revenue Trend')
axes[0].set_xlabel('Date')
axes[0].set_ylabel('Revenue ($)')
axes[0].grid(True, alpha=0.3)

axes[1].bar(daily_sales_pd['date'], daily_sales_pd['unique_customers'], alpha=0.7)
axes[1].set_title('Daily Unique Customers')
axes[1].set_xlabel('Date')
axes[1].set_ylabel('Customer Count')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. Customer Segmentation

In [None]:
# Customer analysis
customer_stats = sales_df.groupBy("customer_id") \
    .agg(
        sum("total_amount").alias("lifetime_value"),
        count("transaction_id").alias("purchase_count"),
        avg("total_amount").alias("avg_purchase_value"),
        max("date").alias("last_purchase_date"),
        min("date").alias("first_purchase_date")
    )

# Add customer segments
customer_segments = customer_stats.withColumn(
    "segment",
    when(col("lifetime_value") > 5000, "VIP")
    .when(col("lifetime_value") > 2000, "Gold")
    .when(col("lifetime_value") > 500, "Silver")
    .otherwise("Bronze")
)

# Segment distribution
segment_dist = customer_segments.groupBy("segment").count().toPandas()

# Visualize segments
plt.figure(figsize=(10, 6))
plt.subplot(1, 2, 1)
plt.pie(segment_dist['count'], labels=segment_dist['segment'], autopct='%1.1f%%')
plt.title('Customer Segment Distribution')

plt.subplot(1, 2, 2)
segment_revenue = customer_segments.groupBy("segment") \
    .agg(sum("lifetime_value").alias("total_revenue")) \
    .toPandas()
plt.bar(segment_revenue['segment'], segment_revenue['total_revenue'])
plt.title('Revenue by Customer Segment')
plt.xlabel('Segment')
plt.ylabel('Total Revenue ($)')

plt.tight_layout()
plt.show()

print("\nTop 10 VIP Customers:")
customer_segments.filter(col("segment") == "VIP") \
    .orderBy(col("lifetime_value").desc()) \
    .show(10)

## 7. Write Results to MinIO

In [None]:
# Write processed data to MinIO
try:
    # Write customer segments
    customer_segments.write \
        .mode("overwrite") \
        .parquet("s3a://processed/customer_segments")
    
    print("✅ Customer segments written to MinIO")
    
    # Write daily sales
    daily_sales.write \
        .mode("overwrite") \
        .parquet("s3a://processed/daily_sales")
    
    print("✅ Daily sales written to MinIO")
    
except Exception as e:
    print(f"Note: MinIO write skipped in demo mode. Error: {e}")
    print("In production, this would save to MinIO successfully.")

## 8. Kafka Integration Example

In [None]:
# Example: Prepare data for Kafka streaming
# In production, you would use kafka-python or Spark Streaming

# Convert recent transactions to JSON for Kafka
recent_transactions = sales_df \
    .filter(col("date") >= (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')) \
    .select(
        "transaction_id",
        "customer_id",
        "product_name",
        "total_amount",
        "region"
    ) \
    .limit(10)

# Convert to JSON format (ready for Kafka)
kafka_messages = recent_transactions.toJSON().collect()

print("Sample Kafka messages:")
for i, msg in enumerate(kafka_messages[:3]):
    print(f"Message {i+1}: {msg}")

print(f"\n📨 {len(kafka_messages)} messages ready for Kafka streaming")

## 9. Clean Up

In [None]:
# Stop Spark session when done
# spark.stop()
print("🎉 Tutorial completed! Keep the Spark session active for further exploration.")

## Next Steps

Congratulations! You've completed the Getting Started tutorial. Here's what you can explore next:

1. **Try the Airflow UI**: Visit http://localhost:8080 to trigger DAGs
2. **Explore MinIO**: Check http://localhost:9000 to see your stored data
3. **Monitor Kafka**: Use http://localhost:9001 to view topics and messages
4. **Advanced Notebooks**: Check out the other notebooks in this folder

### Useful Commands

```python
# Read from MinIO
df = spark.read.parquet("s3a://bucket/path")

# Write to MinIO
df.write.mode("overwrite").parquet("s3a://bucket/path")

# Stream from Kafka (structured streaming)
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "topic_name") \
    .load()
```

Happy Data Engineering! 🚀