# 📊 Customer Churn Prediction using Apache Spark (PySpark)

This project demonstrates an **end-to-end machine learning pipeline** built using **Apache Spark (PySpark)**.  
We analyze customer behavior data to predict churn using a logistic regression model, combined with data cleaning, feature engineering, model tuning, and exporting final results.

---

## 🚀 Project Highlights

- Built entirely with **Apache Spark via PySpark**
- Handles **large-scale tabular data**
- End-to-end ML pipeline using **Spark MLlib**
- Includes:
  - Data cleaning
  - Feature engineering (indexing + assembling)
  - Model training using logistic regression
  - Cross-validation with hyperparameter tuning
  - Churn rate analysis by contract type
  - Saving outputs and model artifacts

---

## 🛠️ Technologies Used

- Apache Spark 3.x
- PySpark (DataFrame API + MLlib)
- Logistic Regression (binary classification)
- CrossValidator for model tuning
- Pandas/CSV output compatible with cloud or local processing

---

## 📁 Folder Structure / Output Explanation

After running the notebook, you will see two output folders:

### `/output/predictions/`
- Contains a file like:  
  `part-00000-<uuid>.csv`
- **Includes:** model predictions and churn probabilities for each customer.
- Columns:
  - `customerID`: Unique identifier
  - `prediction`: Churn prediction (0 = No, 1 = Yes)
  - `churn_probability`: Probability of churn (from logistic regression)

### `/output/churn_rate_by_contract/`
- Contains a file like:  
  `part-00000-<uuid>.csv`
- **Includes:** churn rate aggregated by contract type.
- Helps answer: *Which customer contracts have the highest churn rate?*

---

## 📦 How to Run

### 🔹 Option 1: On Google Colab (Recommended for Beginners)

1. Upload this notebook to Colab
2. Upload your dataset (e.g., `customer_churn.csv`)
3. Install PySpark:
   ```bash
   !pip install pyspark


In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, when, isnan
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, datediff

In [13]:
# Step 1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("CustomerBehaviorAnalytics") \
    .getOrCreate()

In [15]:
# Step 2: Load Dataset (CSV Format)
df = spark.read.csv("/content/Telco-Customer-Churn.csv", header=True, inferSchema=True)

# Step 3: Data Cleaning
cleaned_df = df.dropna()

# Step 4: Basic Data Exploration
cleaned_df.printSchema()
cleaned_df.select("gender", "SeniorCitizen", "MonthlyCharges").show(5)

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)

+------+-------------+--------------+
|gender|SeniorCitizen|MonthlyCharges|
+----

In [16]:
# Step 5: Feature Engineering
processed_df = cleaned_df.withColumn("ChurnFlag", when(col("Churn") == "Yes", 1).otherwise(0))

# Encode categorical features
indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_Index")
    for column in ["gender", "InternetService", "Contract", "PaymentMethod"]
]

for indexer in indexers:
    processed_df = indexer.fit(processed_df).transform(processed_df)

# Assemble features
assembler = VectorAssembler(
    inputCols=["MonthlyCharges", "tenure", "gender_Index", "InternetService_Index", "Contract_Index", "PaymentMethod_Index"],
    outputCol="features"
)
assembled_df = assembler.transform(processed_df)

In [17]:
# Step 6: MLlib Logistic Regression Model
lr = LogisticRegression(featuresCol="features", labelCol="ChurnFlag")
model = lr.fit(assembled_df)
predictions = model.transform(assembled_df)

In [18]:
# Step 7: Evaluation
evaluator = BinaryClassificationEvaluator(labelCol="ChurnFlag")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc:.2f}")

AUC: 0.83


In [19]:
# Step 8: Cross Validation with Param Grid
grid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1.0]).build()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)
cv_model = cv.fit(assembled_df)
cv_predictions = cv_model.transform(assembled_df)
cv_auc = evaluator.evaluate(cv_predictions)
print(f"Cross-Validated AUC: {cv_auc:.2f}")

Cross-Validated AUC: 0.83


In [20]:
# Step 9: Aggregation - Avg Monthly Charges by Internet Service
agg_df = processed_df.groupBy("InternetService") \
    .agg(avg("MonthlyCharges").alias("AvgMonthlyCharge"))
agg_df.show()

+---------------+------------------+
|InternetService|  AvgMonthlyCharge|
+---------------+------------------+
|    Fiber optic| 91.50012919896615|
|             No|21.079193971166454|
|            DSL| 58.10216852540261|
+---------------+------------------+



In [21]:
# Step 10: Churn Rate by Contract Type
churn_rate = processed_df.groupBy("Contract") \
    .agg(avg("ChurnFlag").alias("ChurnRate"))
churn_rate.orderBy("ChurnRate", ascending=False).show()


+--------------+-------------------+
|      Contract|          ChurnRate|
+--------------+-------------------+
|Month-to-month| 0.4270967741935484|
|      One year|0.11269517990495587|
|      Two year|0.02831858407079646|
+--------------+-------------------+



In [27]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import DoubleType

# UDF to extract prob[1]
extract_prob = udf(lambda v: float(v[1]), DoubleType())

predictions_with_score = predictions.withColumn("churn_probability", extract_prob(col("probability")))

predictions_with_score.select("customerID", "prediction", "churn_probability") \
    .write.csv("output/predictions", header=True)


In [28]:
# Stop Spark
spark.stop()