## ETL(Extract, Transform, Load) + Data Cleaning

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

### Start SparkSession

In [None]:
spark = SparkSession.builder.appName("ChurnETL").getOrCreate()

### Load the dataset

In [None]:
df = spark.read.csv("WA_Fn-UseC_-Telco-Customer-Churn.csv", header=True, inferSchema=True)

### Clean TotalCharges column

In [None]:
df = df.withColumn("TotalCharges", when(col("TotalCharges") == " ", None).otherwise(col("TotalCharges").cast("float")))
df = df.dropna()

### Create binary label column from Churn

In [None]:
df = df.withColumn("label", when(col("Churn") == "Yes", 1).otherwise(0))

### Drop irrelevant columns

In [None]:
df = df.drop("customerID", "Churn")

### Save cleaned DataFrame to Parquet

In [None]:
df.write.mode("overwrite").parquet("cleaned_churn.parquet")

### Show

In [None]:
df.show(5)

+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|label|
+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|         No|             No|Month-to-month|     

## Feature Engineering + ML Model Training

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

### Identify categorical columns

In [None]:
categorical_cols = [col_name for col_name, dtype in df.dtypes if dtype == 'string']

### Index each categorical column

In [None]:
for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_index")
    df = indexer.fit(df).transform(df)

Define input features (numeric + indexed categorical)

In [None]:
feature_cols = [col for col in df.columns if col not in categorical_cols + ["label"]]

### Assemble features into a single vector column

In [None]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df = assembler.transform(df)

### Train-test split

In [None]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

### Train logistic regression model

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_df)

### Make predictions

In [None]:
predictions = model.transform(test_df)

### Evaluate using AUC(Area Under Curve)

In [56]:
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
# ROC is Receiver Operating Characteristic
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.4f}")

AUC: 0.8555
