In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TestSpark") \
    .master("local[*]") \
    .getOrCreate()

print(spark.version)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, weekofyear, month
import os

# Create outputs folder
output_dir = os.path.join(os.getcwd(), "outputs", "pyspark")
os.makedirs(output_dir, exist_ok=True)
print(f"Outputs will be saved to: {output_dir}\n")

# Start Spark session
spark = SparkSession.builder.appName("WalmartSalesForecasting").getOrCreate()

# Load CSV files
data_dir = os.path.join(os.getcwd(), "data")
train = spark.read.csv(os.path.join(data_dir, "train.csv", "train.csv"), header=True, inferSchema=True)
features = spark.read.csv(os.path.join(data_dir, "features.csv", "features.csv"), header=True, inferSchema=True)
stores = spark.read.csv(os.path.join(data_dir, "stores.csv"), header=True, inferSchema=True)

# Merge datasets
df = train.join(features, on=["Store", "Date"], how="left") \
         .join(stores, on="Store", how="left")

# Convert 'Date' to proper format
df = df.withColumn("Date", to_date("Date", "yyyy-MM-dd"))
df = df.withColumn("Month", month("Date"))
df = df.withColumn("Week", weekofyear("Date"))

df.show(5)

KeyboardInterrupt: 

## Data Validation (PySpark)

In [None]:
"""
PySpark Data Validation Layer
Scalable validation checks for large datasets
"""

from pyspark.sql.functions import col, count, when, isnan
import sys
from io import StringIO

print("=" * 60)
print("PYSPARK DATA VALIDATION CHECKS")
print("=" * 60)

# Capture validation output
validation_output = []
validation_output.append("=" * 60)
validation_output.append("PYSPARK DATA VALIDATION CHECKS")
validation_output.append("=" * 60)

# 1. Schema Validation
print("\n1. SCHEMA VALIDATION")
print("-" * 40)
validation_output.append("\n1. SCHEMA VALIDATION")
validation_output.append("-" * 40)
print("Train Schema:")
train.printSchema()
print("\nStores Schema:")
stores.printSchema()
print("✓ Schemas displayed for verification")
validation_output.append("✓ Schemas displayed for verification")

# 2. Row Count Validation
print("\n2. ROW COUNT VALIDATION")
print("-" * 40)
validation_output.append("\n2. ROW COUNT VALIDATION")
validation_output.append("-" * 40)
train_count = train.count()
features_count = features.count()
stores_count = stores.count()
df_count = df.count()

print(f"Train rows: {train_count:,}")
print(f"Features rows: {features_count:,}")
print(f"Stores rows: {stores_count:,}")
print(f"Merged dataset rows: {df_count:,}")
validation_output.append(f"Train rows: {train_count:,}")
validation_output.append(f"Features rows: {features_count:,}")
validation_output.append(f"Stores rows: {stores_count:,}")
validation_output.append(f"Merged dataset rows: {df_count:,}")

# Validate join integrity
assert df_count == train_count, f"Row count mismatch after merge. Expected {train_count}, got {df_count}"
print("✓ Join integrity validated (no row explosion)")
validation_output.append("✓ Join integrity validated (no row explosion)")

# 3. Null Value Check
print("\n3. NULL VALUE VALIDATION")
print("-" * 40)
validation_output.append("\n3. NULL VALUE VALIDATION")
validation_output.append("-" * 40)
null_counts = df.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in ['Store', 'Dept', 'Date', 'Weekly_Sales', 'Type', 'Size']
])
null_counts.show()
print("✓ Null counts displayed")
validation_output.append("✓ Null counts displayed")

# 4. Data Quality Checks
print("\n4. DATA QUALITY CHECKS")
print("-" * 40)
validation_output.append("\n4. DATA QUALITY CHECKS")
validation_output.append("-" * 40)

# Check for negative sales
negative_sales = df.filter(col("Weekly_Sales") < 0).count()
print(f"  Negative Weekly_Sales: {negative_sales}")
validation_output.append(f"  Negative Weekly_Sales: {negative_sales}")

# Unique counts
unique_stores = df.select("Store").distinct().count()
unique_depts = df.select("Dept").distinct().count()
print(f"  Unique Stores: {unique_stores}")
print(f"  Unique Departments: {unique_depts}")
validation_output.append(f"  Unique Stores: {unique_stores}")
validation_output.append(f"  Unique Departments: {unique_depts}")

# Store type distribution
print("\nStore Type Distribution:")
validation_output.append("\nStore Type Distribution:")
store_dist = df.groupBy("Type").count().orderBy("Type")
store_dist.show()

# Date range
date_range = df.select(
    col("Date").cast("string").alias("Date")
).agg({"Date": "min", "Date": "max"})
print("Date Range:")
date_range.show()

print("\n" + "=" * 60)
print("PYSPARK VALIDATION COMPLETE")
print("=" * 60)
validation_output.append("\n" + "=" * 60)
validation_output.append("PYSPARK VALIDATION COMPLETE")
validation_output.append("=" * 60)

# Save validation report
with open(os.path.join(output_dir, 'pyspark_validation_report.txt'), 'w') as f:
    f.write('\n'.join(validation_output))
print(f"\n✓ PySpark validation report saved to: {os.path.join(output_dir, 'pyspark_validation_report.txt')}")

# Save processed data sample
df.limit(1000).toPandas().to_csv(os.path.join(output_dir, 'processed_data_sample.csv'), index=False)
print(f"✓ Processed data sample saved to: processed_data_sample.csv")