In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
# At Intact, this session is usually already created for you in a notebook

spark = SparkSession.builder \
    .appName("KMeansFix") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/03 17:50:23 WARN Utils: Your hostname, aduu-ThinkPad-P14s-Gen-4, resolves to a loopback address: 127.0.1.1; using 192.168.1.10 instead (on interface wlp0s20f3)
26/01/03 17:50:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/03 17:50:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/03 17:50:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
26/01/03 17:50:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
df = spark.read.csv("telecom_churn_data.csv",header=True,inferSchema=True)
print(df.show())
print(f"number of rows:{df.count()}")

                                                                                

+-----------+---+------------+--------------+--------------+-------------+------+-----+
|Customer_ID|Age|Monthly_Bill|Total_Usage_GB| Contract_Type|Support_Calls|Gender|Churn|
+-----------+---+------------+--------------+--------------+-------------+------+-----+
|          1| 56|       180.6|         540.9|Month-to-Month|            3|     0|    1|
|          2| 69|      105.41|         266.8|Month-to-Month|            0|     0|    0|
|          3| 46|       91.48|         412.1|      Two Year|            1|     1|    0|
|          4| 32|       45.06|         966.2|      One Year|            1|     0|    0|
|          5| 60|      139.03|         580.4|      Two Year|            2|     0|    1|
|          6| 25|      171.36|          25.8|      Two Year|            0|     0|    0|
|          7| 78|      173.61|         970.7|      Two Year|            1|     1|    0|
|          8| 38|       39.03|         687.6|      One Year|            2|     0|    0|
|          9| 56|       110.8|  

In [3]:
# churned Customer dataframe:
from pyspark.sql.functions import col
churnedDf = df.groupBy(col("Churn")).count()
print(churnedDf.show())

contractTypeDf = df.groupBy(col("Contract_Type")).count()
print(contractTypeDf.show())


                                                                                

+-----+-------+
|Churn|  count|
+-----+-------+
|    1| 607088|
|    0|4392912|
+-----+-------+

None
+--------------+-------+
| Contract_Type|  count|
+--------------+-------+
|Month-to-Month|1666160|
|      One Year|1667487|
|      Two Year|1666353|
+--------------+-------+

None


                                                                                

### KMeans for this record:

In [4]:
print(df.show(2))

+-----------+---+------------+--------------+--------------+-------------+------+-----+
|Customer_ID|Age|Monthly_Bill|Total_Usage_GB| Contract_Type|Support_Calls|Gender|Churn|
+-----------+---+------------+--------------+--------------+-------------+------+-----+
|          1| 56|       180.6|         540.9|Month-to-Month|            3|     0|    1|
|          2| 69|      105.41|         266.8|Month-to-Month|            0|     0|    0|
+-----------+---+------------+--------------+--------------+-------------+------+-----+
only showing top 2 rows
None


In [5]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

#preprocessing contact type:
indexer = StringIndexer(inputCol='Contract_Type',outputCol='Contract_Type_Index')
indexed_df = indexer.fit(df).transform(df)
# print(indexed_df.show())

#applying one hot encoding:
encoder = OneHotEncoder(inputCol='Contract_Type_Index', outputCol='Contract_Vector')
encoded_df = encoder.fit(indexed_df).transform(indexed_df)
print(encoded_df.show())



                                                                                

+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+
|Customer_ID|Age|Monthly_Bill|Total_Usage_GB| Contract_Type|Support_Calls|Gender|Churn|Contract_Type_Index|Contract_Vector|
+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+
|          1| 56|       180.6|         540.9|Month-to-Month|            3|     0|    1|                2.0|      (2,[],[])|
|          2| 69|      105.41|         266.8|Month-to-Month|            0|     0|    0|                2.0|      (2,[],[])|
|          3| 46|       91.48|         412.1|      Two Year|            1|     1|    0|                1.0|  (2,[1],[1.0])|
|          4| 32|       45.06|         966.2|      One Year|            1|     0|    0|                0.0|  (2,[0],[1.0])|
|          5| 60|      139.03|         580.4|      Two Year|            2|     0|    1|                1.0|  (2,[1],[1.0])|
|       

In [6]:
# applying vectorAssembler:
# encoded_df = encoded_df.drop("Contract_Type_Index")
columnsArr = encoded_df.columns
print(columnsArr)
assembler = VectorAssembler(inputCols=['Age', 'Monthly_Bill', 'Total_Usage_GB', 'Support_Calls', 'Gender', 'Churn', 'Contract_Vector'],
                            outputCol='features')
preprocessedDf = assembler.transform(encoded_df)
print(preprocessedDf.show())

['Customer_ID', 'Age', 'Monthly_Bill', 'Total_Usage_GB', 'Contract_Type', 'Support_Calls', 'Gender', 'Churn', 'Contract_Type_Index', 'Contract_Vector']
+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+--------------------+
|Customer_ID|Age|Monthly_Bill|Total_Usage_GB| Contract_Type|Support_Calls|Gender|Churn|Contract_Type_Index|Contract_Vector|            features|
+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+--------------------+
|          1| 56|       180.6|         540.9|Month-to-Month|            3|     0|    1|                2.0|      (2,[],[])|[56.0,180.6,540.9...|
|          2| 69|      105.41|         266.8|Month-to-Month|            0|     0|    0|                2.0|      (2,[],[])|(8,[0,1,2],[69.0,...|
|          3| 46|       91.48|         412.1|      Two Year|            1|     1|    0|                1.0|  (2,[1],[1.0])|

In [7]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures')
finalData = scaler.fit(preprocessedDf).transform(preprocessedDf)
print(finalData.show())



+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+--------------------+--------------------+
|Customer_ID|Age|Monthly_Bill|Total_Usage_GB| Contract_Type|Support_Calls|Gender|Churn|Contract_Type_Index|Contract_Vector|            features|      scaledFeatures|
+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+--------------------+--------------------+
|          1| 56|       180.6|         540.9|Month-to-Month|            3|     0|    1|                2.0|      (2,[],[])|[56.0,180.6,540.9...|[3.13001991747009...|
|          2| 69|      105.41|         266.8|Month-to-Month|            0|     0|    0|                2.0|      (2,[],[])|(8,[0,1,2],[69.0,...|(8,[0,1,2],[3.856...|
|          3| 46|       91.48|         412.1|      Two Year|            1|     1|    0|                1.0|  (2,[1],[1.0])|[46.0,91.48,412.1...|[2.57108778935043...|
|   

                                                                                

In [12]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

trainData,testData = finalData.randomSplit([0.8,0.2])
print(f"Size of trainData: {trainData.count()} & Test Data: {testData.count()}")



Size of trainData: 4001697 & Test Data: 998303


                                                                                

In [13]:
lr = LogisticRegression(featuresCol='scaledFeatures',labelCol='Churn')
lrModel = lr.fit(trainData)
print("Model Fitting done!")

                                                                                

Model Fitting done!


In [17]:
predictions = lrModel.transform(testData)
print(predictions.show())

evaluator = BinaryClassificationEvaluator(labelCol='Churn',metricName='areaUnderROC')
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy (ROC):{accuracy}")


                                                                                

+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+--------------------+--------------------+--------------------+--------------------+----------+
|Customer_ID|Age|Monthly_Bill|Total_Usage_GB| Contract_Type|Support_Calls|Gender|Churn|Contract_Type_Index|Contract_Vector|            features|      scaledFeatures|       rawPrediction|         probability|prediction|
+-----------+---+------------+--------------+--------------+-------------+------+-----+-------------------+---------------+--------------------+--------------------+--------------------+--------------------+----------+
|          5| 60|      139.03|         580.4|      Two Year|            2|     0|    1|                1.0|  (2,[1],[1.0])|[60.0,139.03,580....|[3.35359276871795...|[-18.253158567715...|[1.18237149646517...|       1.0|
|          7| 78|      173.61|         970.7|      Two Year|            1|     1|    0|                1.0|  (2,[1],[1.0])|[

                                                                                

Model Accuracy (ROC):0.9999998329564607
