Step 1: Load the Data

In [49]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [50]:
spark = SparkSession.builder \
    .appName("Customer Churn Prediction") \
    .getOrCreate()

In [51]:
 schema = StructType() \
    .add("customerID",StringType(),True) \
    .add("tenure",IntegerType(),True) \
	.add("PhoneService",StringType(),True) \
	.add("Contract",StringType(),True) \
	.add("PaperlessBilling",StringType(),True) \
	.add("PaymentMethod",StringType(),True) \
	.add("MonthlyCharges",DoubleType(),True) \
	.add("TotalCharges",DoubleType(),True) \
	.add("Churn",StringType(),True) \
    .add("ChurnInt",IntegerType(),True)

data = spark.read.csv("data/customer_churn.csv", header=True, inferSchema=True)

In [52]:
data = spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("data/customer_churn.csv")

In [53]:
data.printSchema()
data.show(5)

root
 |-- customerID: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)
 |-- ChurnInt: integer (nullable = true)

+----------+------+------------+--------------+----------------+--------------------+--------------+------------+-----+--------+
|customerID|tenure|PhoneService|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|ChurnInt|
+----------+------+------------+--------------+----------------+--------------------+--------------+------------+-----+--------+
|7590-VHVEG|     1|          No|Month-to-month|             Yes|    Electronic check|         29.85|       29.85|   No|       0|
|5575-GNVDE|    34|         Yes|      One year|     

Step 2: Data Cleaning and Preprocessing

In [54]:
data_cleaned = data.dropna()

In [55]:
from pyspark.ml.feature import StringIndexer

In [56]:
indexer = StringIndexer(inputCol="PaymentMethod", outputCol="PaymentMethodIndex")
data_indexed = indexer.fit(data_cleaned).transform(data_cleaned)

In [57]:
data_indexed.show(5)

+----------+------+------------+--------------+----------------+--------------------+--------------+------------+-----+--------+------------------+
|customerID|tenure|PhoneService|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|ChurnInt|PaymentMethodIndex|
+----------+------+------------+--------------+----------------+--------------------+--------------+------------+-----+--------+------------------+
|7590-VHVEG|     1|          No|Month-to-month|             Yes|    Electronic check|         29.85|       29.85|   No|       0|               0.0|
|5575-GNVDE|    34|         Yes|      One year|              No|        Mailed check|         56.95|      1889.5|   No|       0|               1.0|
|3668-QPYBK|     2|         Yes|Month-to-month|             Yes|        Mailed check|         53.85|      108.15|  Yes|       1|               1.0|
|7795-CFOCW|    45|          No|      One year|              No|Bank transfer (au...|          42.3|     1840.75

Step 3: Feature Engineering

In [58]:
from pyspark.ml.feature import VectorAssembler

In [59]:
assembler = VectorAssembler(
    inputCols=["tenure", "MonthlyCharges", "TotalCharges", "PaymentMethodIndex"],
    outputCol="features"
)

In [60]:
data_prepared = assembler.transform(data_indexed)

In [61]:
data_prepared.select("features", "ChurnInt").show(5)

+--------------------+--------+
|            features|ChurnInt|
+--------------------+--------+
|[1.0,29.85,29.85,...|       0|
|[34.0,56.95,1889....|       0|
|[2.0,53.85,108.15...|       1|
|[45.0,42.3,1840.7...|       0|
|[2.0,70.7,151.65,...|       1|
+--------------------+--------+
only showing top 5 rows



In [65]:
from pyspark.ml.classification import LogisticRegression

# Initialize the Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="ChurnInt")

# Train the model
model = lr.fit(data_prepared)

# Make predictions on the dataset
predictions = model.transform(data_prepared)

# Show the predictions
predictions.select("ChurnInt", "prediction", "probability").show(5)

24/10/24 16:45:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/10/24 16:45:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


+--------+----------+--------------------+
|ChurnInt|prediction|         probability|
+--------+----------+--------------------+
|       0|       0.0|[0.62498710862861...|
|       0|       0.0|[0.85789932399678...|
|       1|       0.0|[0.53329796119791...|
|       0|       0.0|[0.95948201485725...|
|       1|       1.0|[0.35341774319420...|
+--------+----------+--------------------+
only showing top 5 rows



In [66]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="ChurnInt", metricName="areaUnderROC")

# Evaluate the model
roc_auc = evaluator.evaluate(predictions)
print(f"Area under ROC curve: {roc_auc}")

Area under ROC curve: 0.8145252878162288
