# Apache Spark Data Processing

This notebook demonstrates Big Data processing using Apache Spark.

## 1. Setup Spark Session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import sys
from pathlib import Path

# Add src to path
sys.path.append('../src')

from utils.config import RAW_DATA_DIR, PROCESSED_DATA_DIR, SPARK_CONFIG

# Create Spark session
spark = SparkSession.builder \
    .appName("BigDataAnalysis") \
    .master("local[*]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

## 2. Load Data with Spark

In [None]:
# Load CSV data
# df = spark.read.csv(str(RAW_DATA_DIR / 'your_data.csv'), 
#                     header=True, 
#                     inferSchema=True)

# Show first few rows
# df.show(5)

In [None]:
# Display schema
# df.printSchema()

In [None]:
# Basic statistics
# df.describe().show()

## 3. Data Transformations

In [None]:
# Select columns
# df_selected = df.select('column1', 'column2', 'column3')
# df_selected.show(5)

In [None]:
# Filter rows
# df_filtered = df.filter(F.col('column1') > 100)
# print(f"Filtered rows: {df_filtered.count()}")

In [None]:
# Add new column
# df_transformed = df.withColumn('new_column', F.col('column1') * 2)
# df_transformed.show(5)

## 4. Aggregations

In [None]:
# Group by aggregation
# df_grouped = df.groupBy('category_column').agg(
#     F.count('*').alias('count'),
#     F.mean('numeric_column').alias('mean_value'),
#     F.sum('numeric_column').alias('total')
# )
# df_grouped.show()

## 5. SQL Queries

In [None]:
# Register DataFrame as SQL table
# df.createOrReplaceTempView('data_table')

# Run SQL query
# result = spark.sql("""
#     SELECT category, COUNT(*) as count
#     FROM data_table
#     GROUP BY category
#     ORDER BY count DESC
# """)
# result.show()

## 6. Save Results

In [None]:
# Save as Parquet (efficient columnar format)
# df_transformed.write.parquet(
#     str(PROCESSED_DATA_DIR / 'processed_data.parquet'),
#     mode='overwrite'
# )

# Save as CSV
# df_transformed.write.csv(
#     str(PROCESSED_DATA_DIR / 'processed_data.csv'),
#     header=True,
#     mode='overwrite'
# )

## 7. Performance Monitoring

In [None]:
# Check execution plan
# df_transformed.explain()

## 8. Cleanup

In [None]:
# Stop Spark session
# spark.stop()