In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TelcoCustomerChurn").getOrCreate()
data = spark.read.csv('/content/Telco_Customer_Churn.csv', header=True, inferSchema=True)
data.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|  

In [22]:
from pyspark.sql.functions import col, when
data = data.fillna({'TotalCharges': 0})
data = data.withColumn('SeniorCitizen', when(col('SeniorCitizen') == 1, 'Yes').otherwise('No'))


In [23]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_index")
data = indexer.fit(data).transform(data)
data.show()


+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+------------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|gender_index|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+------------+
|7590-VHVEG|Female|   

In [24]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["tenure", "MonthlyCharges", "gender_index"], outputCol="features")
data = assembler.transform(data)


In [25]:
train, test = data.randomSplit([0.8, 0.2], seed=42)


In [27]:
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer to convert the 'Churn' column to numeric
indexer = StringIndexer(inputCol="Churn", outputCol="Churn_index")

# Fit the StringIndexer on the train data and transform both train and test data
indexer_model = indexer.fit(train) # Fit on train data
train = indexer_model.transform(train) # Transform train data
test = indexer_model.transform(test) # Transform test data

# Update the LogisticRegression to use the new numeric label column
lr = LogisticRegression(featuresCol="features", labelCol="Churn_index") # Use 'Churn_index' instead of 'Churn'
model = lr.fit(train)

In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Change the labelCol to 'Churn_index' to match the numeric label column
evaluator = BinaryClassificationEvaluator(labelCol="Churn_index")
predictions = model.transform(test)
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

AUC: 0.8072368274912209


In [30]:
model.write().overwrite().save("customer_churn_model")


In [31]:
from pyspark.ml.classification import LogisticRegressionModel

loaded_model = LogisticRegressionModel.load("customer_churn_model")
result = loaded_model.transform(test)
result.select("prediction").show()


+----------+
|prediction|
+----------+
|       1.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       1.0|
|       0.0|
|       1.0|
|       0.0|
+----------+
only showing top 20 rows



In [33]:
from pyspark.sql.functions import col

# Assuming 'Churn' is the actual label column and 'prediction' is the prediction column
result.select(col("Churn"), col("prediction")).show()

+-----+----------+
|Churn|prediction|
+-----+----------+
|  Yes|       1.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|  Yes|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|   No|       0.0|
|  Yes|       1.0|
|   No|       0.0|
|  Yes|       1.0|
|  Yes|       0.0|
+-----+----------+
only showing top 20 rows



In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Use 'Churn_index' which is the numeric version of 'Churn' column
evaluator = MulticlassClassificationEvaluator(labelCol="Churn_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(result)
print(f"Accuracy: {accuracy}")

Accuracy: 0.7888475836431227
