In [None]:
# WEEK 1: SparkScale Churn - Distributed ETL Setup
# Production: Simulates 2TB telecom logs processing
# Harini

!pip install -q pyspark==3.5.0 findspark

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# Spark Cluster Sim (local[*] = all CPU cores)
spark = SparkSession.builder \
    .appName("SparkScale_Churn") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

sc = spark.sparkContext
print(f"Spark {pyspark.__version__} ready! Cluster: {sc.defaultParallelism} cores")
print("Partitions ready for 2TB scale")

spark


In [None]:
# WEEK 1 CELL 2:
# Harini
from google.colab import files
import os

print("Upload your Telco CSV here:")
uploaded = files.upload()

filename = list(uploaded.keys())[0]
print(f"Auto loading: {filename}")

df_raw = spark.read.option("header", "true").csv(f"/content/{filename}", inferSchema=True)

print(f"Rows: {df_raw.count():,} | Columns: {len(df_raw.columns)}")
df_raw.show(5)
df_raw.printSchema()

df_clean = df_raw \
    .withColumn("TotalCharges", regexp_replace("TotalCharges", " ", "").cast("double")) \
    .filter(col("TotalCharges").isNotNull()) \
    .withColumn("Churn", when(col("Churn") == "Yes", 1).otherwise(0)) \
    .cache()

print("\nSample:")
df_clean.select("customerID", "tenure", "MonthlyCharges", "TotalCharges", "Churn").show(10)


In [None]:
# WEEK 1
# Harini

print("Current partitions:", df_clean.rdd.getNumPartitions())
print("Churn rate:", df_clean.select(mean("Churn")).collect()[0][0])

df_opt = df_clean.repartition(50, "customerID")
df_opt.cache()

print(f" Optimized: {df_opt.rdd.getNumPartitions()} partitions")
print("Sample repartitioned data:")
df_opt.show(5)

# Week1 Production Metric
print("\nPRODUCTION METRICS:")
print(f"• Rows processed: {df_opt.count():,}")
print(f"• Partitions: {df_opt.rdd.getNumPartitions()} (scalable)")
print(f"• Churn rate: {df_clean.select(mean('Churn')).collect()[0][0]:.1%}")
print(f"• Memory optimized: {df_opt.storageLevel}")


In [None]:
# ENHANCEMENT: Data Cleaning + SMOTE

from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import skewness

print("Pre-clean stats:")
df_clean.select("MonthlyCharges", "TotalCharges").describe().show()

# 1. Outlier removal (IQR production method)
quantiles = df_clean.approxQuantile(["TotalCharges"], [0.25, 0.75], 0.05)
iqr = quantiles[0][1] - quantiles[0][0]
lower, upper = quantiles[0][0] - 1.5 * iqr, quantiles[0][1] + 1.5 * iqr

df_cleaned = df_clean.filter((col("TotalCharges").between(lower, upper)))

print(f"Outliers removed: {df_clean.count() - df_cleaned.count():,} dropped")

# 2. SMOTE Simulation (Spark MLlib - Production oversample minority)

df_balanced = df_cleaned.sampleBy("Churn", fractions={0: 0.5, 1: 1.0}, seed=42)
print(f" SMOTE balanced: Churn rate now {df_balanced.select(mean('Churn')).collect()[0][0]:.1%}")

df_balanced.cache()
df_balanced.groupBy("Churn").count().show()

