In [None]:
! pip install pyspark -q

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

spark = SparkSession.builder.appName("TelcoDataProcessing").getOrCreate()
telco_df = spark.read.csv("/content/Telco.csv", header=True, inferSchema=True)

columns_YN = ['Partner', 'Dependents', 'PhoneService', 'OnlineSecurity', 'DeviceProtection',
              'TechSupport', 'StreamingTV', 'StreamingMovies', 'PaperlessBilling', 'Churn']

for col_name in columns_YN:
    telco_df = telco_df.withColumn(col_name, when(col(col_name) == 'Yes', 1).otherwise(0))

telco_df = telco_df.withColumn('gender', when(col('gender') == 'Male', 1).otherwise(0))

telco_df = telco_df.withColumn("TotalCharges", col("TotalCharges").cast("float"))

telco_df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+-------------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|       OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+-------------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|     0|            0|      1|         0|     1|           0|No phone service|            DSL|             0|                Yes|     

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

categorical_cols = ['gender', 'SeniorCitizen','partner', 'dependents', 'PhoneService', 'MultipleLines',
                    'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',
                    'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract',
                    'PaperlessBilling', 'PaymentMethod', 'churn']

for col in categorical_cols:
    telco_df = telco_df.withColumn(col, telco_df[col].cast(StringType()))

indexers = [StringIndexer(inputCol=col, outputCol=col+"_index", handleInvalid="keep")
            for col in categorical_cols]

indexed_df = telco_df
for indexer in indexers:
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)

indexed_df.show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+-------------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+------------+-------------------+-------------+----------------+------------------+-------------------+---------------------+--------------------+------------------+----------------------+-----------------+-----------------+---------------------+--------------+----------------------+-------------------+-----------+
|customerID|gender|SeniorCitizen|partner|dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|       OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|churn|gender_index|SeniorCitizen_index|partner_index|dependents_index|PhoneService_index|MultipleLines_index|InternetServ

In [None]:
indexed_df_dropped = indexed_df.drop(*categorical_cols)
indexed_df_dropped.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- gender_index: double (nullable = false)
 |-- SeniorCitizen_index: double (nullable = false)
 |-- partner_index: double (nullable = false)
 |-- dependents_index: double (nullable = false)
 |-- PhoneService_index: double (nullable = false)
 |-- MultipleLines_index: double (nullable = false)
 |-- InternetService_index: double (nullable = false)
 |-- OnlineSecurity_index: double (nullable = false)
 |-- OnlineBackup_index: double (nullable = false)
 |-- DeviceProtection_index: double (nullable = false)
 |-- TechSupport_index: double (nullable = false)
 |-- StreamingTV_index: double (nullable = false)
 |-- StreamingMovies_index: double (nullable = false)
 |-- Contract_index: double (nullable = false)
 |-- PaperlessBilling_index: double (nullable = false)
 |-- PaymentMethod_index: double (nullable = false)
 |-- chur

In [None]:
indexed_df_dropped_noCID = indexed_df_dropped.drop("customerID")
indexed_df_dropped_noCID.printSchema()

root
 |-- tenure: integer (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: float (nullable = true)
 |-- gender_index: double (nullable = false)
 |-- SeniorCitizen_index: double (nullable = false)
 |-- partner_index: double (nullable = false)
 |-- dependents_index: double (nullable = false)
 |-- PhoneService_index: double (nullable = false)
 |-- MultipleLines_index: double (nullable = false)
 |-- InternetService_index: double (nullable = false)
 |-- OnlineSecurity_index: double (nullable = false)
 |-- OnlineBackup_index: double (nullable = false)
 |-- DeviceProtection_index: double (nullable = false)
 |-- TechSupport_index: double (nullable = false)
 |-- StreamingTV_index: double (nullable = false)
 |-- StreamingMovies_index: double (nullable = false)
 |-- Contract_index: double (nullable = false)
 |-- PaperlessBilling_index: double (nullable = false)
 |-- PaymentMethod_index: double (nullable = false)
 |-- churn_index: double (nullable = false)



In [None]:
#Decision Tree

from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder \
    .appName("Decision Tree Churn Prediction") \
    .getOrCreate()

feature_cols = ['tenure', 'MonthlyCharges', 'TotalCharges', 'gender_index', 'SeniorCitizen_index',
                'partner_index', 'dependents_index', 'PhoneService_index', 'MultipleLines_index',
                'InternetService_index', 'OnlineSecurity_index', 'OnlineBackup_index',
                'DeviceProtection_index', 'TechSupport_index', 'StreamingTV_index',
                'StreamingMovies_index', 'Contract_index', 'PaperlessBilling_index',
                'PaymentMethod_index']

for col_name in feature_cols:
    indexed_df_dropped = indexed_df_dropped.withColumn(col_name, col(col_name).cast("double")).fillna(0, subset=[col_name])

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

dt = DecisionTreeClassifier(labelCol="churn_index", featuresCol="features")

pipeline = Pipeline(stages=[assembler, dt])

train_data, test_data = indexed_df_dropped.randomSplit([0.7, 0.3], seed=42)

model = pipeline.fit(train_data)

predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="churn_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

fp = predictions.filter("prediction == 1 AND churn_index == 0").count()
fn = predictions.filter("prediction == 0 AND churn_index == 1").count()

predictions.select("customerID", "churn_index", "prediction").show()

print("False Positives:", fp)
print("False Negatives:", fn)
print("Accuracy:", accuracy)

+----------+-----------+----------+
|customerID|churn_index|prediction|
+----------+-----------+----------+
|0004-TLHLJ|        1.0|       1.0|
|0013-SMEOE|        0.0|       0.0|
|0015-UOCOJ|        0.0|       0.0|
|0016-QLJIS|        0.0|       0.0|
|0019-EFAEP|        0.0|       0.0|
|0019-GFNTW|        0.0|       0.0|
|0020-INWCK|        0.0|       0.0|
|0023-HGHWL|        1.0|       1.0|
|0023-XUOPT|        1.0|       1.0|
|0030-FNXPP|        0.0|       0.0|
|0031-PVLZI|        1.0|       0.0|
|0042-JVWOJ|        0.0|       0.0|
|0042-RLHYP|        0.0|       0.0|
|0048-LUMLS|        0.0|       0.0|
|0052-DCKON|        0.0|       0.0|
|0056-EPFBG|        0.0|       0.0|
|0057-QBUQH|        0.0|       0.0|
|0064-YIJGF|        0.0|       0.0|
|0071-NDAFP|        0.0|       0.0|
|0074-HDKDG|        0.0|       0.0|
+----------+-----------+----------+
only showing top 20 rows

False Positives: 175
False Negatives: 220
Accuracy: 0.8031888390632785


In [None]:
#Random Forest

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("FeatureImportance").getOrCreate()

feature_columns = [col for col in indexed_df_dropped.columns if col not in ['churn_index', 'customerID']]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
labelIndexer = StringIndexer(inputCol="churn_index", outputCol="indexedLabel").fit(indexed_df_dropped)

(train_data, test_data) = indexed_df_dropped.randomSplit([0.7, 0.3], seed=42)
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

pipeline = Pipeline(stages=[assembler, labelIndexer, rf])
model = pipeline.fit(train_data)
importances = model.stages[-1].featureImportances

feature_importances = sorted(zip(importances, feature_columns), reverse=True)

rank = 1
for importance, feature_name in feature_importances:
    print(f"Rank {rank}: {feature_name} - Importance: {importance}")
    rank += 1

Rank 1: Contract_index - Importance: 0.2628144465249463
Rank 2: tenure - Importance: 0.20177730974007463
Rank 3: InternetService_index - Importance: 0.17911902268639615
Rank 4: TotalCharges - Importance: 0.10619793339133769
Rank 5: OnlineBackup_index - Importance: 0.05932397091454907
Rank 6: PaymentMethod_index - Importance: 0.05750643334678044
Rank 7: MonthlyCharges - Importance: 0.05701701380535436
Rank 8: PaperlessBilling_index - Importance: 0.02027749660703685
Rank 9: TechSupport_index - Importance: 0.01893204339318858
Rank 10: StreamingMovies_index - Importance: 0.0070134177423893515
Rank 11: OnlineSecurity_index - Importance: 0.006748531634138609
Rank 12: MultipleLines_index - Importance: 0.004883971781073422
Rank 13: SeniorCitizen_index - Importance: 0.004015441773412743
Rank 14: dependents_index - Importance: 0.0036262645459724124
Rank 15: partner_index - Importance: 0.0034467316215871908
Rank 16: PhoneService_index - Importance: 0.0029736327735527743
Rank 17: StreamingTV_index

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col, concat, lit, format_number


spark = SparkSession.builder \
    .appName("Logistic Regression Churn Prediction") \
    .getOrCreate()

schema = StructType([
    StructField("Model_Type", StringType(), True),
    StructField("Accuracy", DoubleType(), True),
    StructField("False_Positive_Count", IntegerType(), True),
    StructField("False_Negative_Count", IntegerType(), True)
])

result_df = spark.createDataFrame([], schema)

result_df.show()

+----------+--------+--------------------+--------------------+
|Model_Type|Accuracy|False_Positive_Count|False_Negative_Count|
+----------+--------+--------------------+--------------------+
+----------+--------+--------------------+--------------------+



In [None]:
#Decision Tree

from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Decision Tree Churn Prediction") \
    .getOrCreate()

feature_cols = ['tenure', 'MonthlyCharges', 'TotalCharges', 'SeniorCitizen_index',
                'MultipleLines_index', 'InternetService_index', 'OnlineSecurity_index',
                'OnlineBackup_index', 'TechSupport_index', 'StreamingTV_index',
                'StreamingMovies_index', 'Contract_index', 'PaperlessBilling_index',
                'PaymentMethod_index']

for model_type in ["Decision Tree"]:
    for col_name in feature_cols:
        indexed_df_dropped = indexed_df_dropped.withColumn(col_name, col(col_name).cast("double")).fillna(0, subset=[col_name])

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

    dt = DecisionTreeClassifier(labelCol="churn_index", featuresCol="features")

    pipeline = Pipeline(stages=[assembler, dt])

    train_data, test_data = indexed_df_dropped.randomSplit([0.7, 0.3], seed=42)

    model = pipeline.fit(train_data)

    predictions = model.transform(test_data)

    evaluator = MulticlassClassificationEvaluator(labelCol="churn_index", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    fp = predictions.filter("prediction == 1 AND churn_index == 0").count()
    fn = predictions.filter("prediction == 0 AND churn_index == 1").count()

    result_df = result_df.union(spark.createDataFrame([(model_type,accuracy,fp,fn)], ["Model_Type","Accuracy", "False_Positive_Count","False_Negative_Count"]))

    predictions.select("customerID", "churn_index", "prediction").show()

    print("False Positives:", fp)
    print("False Negatives:", fn)
    print("Accuracy:", accuracy)


+----------+-----------+----------+
|customerID|churn_index|prediction|
+----------+-----------+----------+
|0004-TLHLJ|        1.0|       1.0|
|0013-SMEOE|        0.0|       0.0|
|0015-UOCOJ|        0.0|       0.0|
|0016-QLJIS|        0.0|       0.0|
|0019-EFAEP|        0.0|       0.0|
|0019-GFNTW|        0.0|       0.0|
|0020-INWCK|        0.0|       0.0|
|0023-HGHWL|        1.0|       1.0|
|0023-XUOPT|        1.0|       1.0|
|0030-FNXPP|        0.0|       0.0|
|0031-PVLZI|        1.0|       0.0|
|0042-JVWOJ|        0.0|       0.0|
|0042-RLHYP|        0.0|       0.0|
|0048-LUMLS|        0.0|       0.0|
|0052-DCKON|        0.0|       0.0|
|0056-EPFBG|        0.0|       0.0|
|0057-QBUQH|        0.0|       0.0|
|0064-YIJGF|        0.0|       0.0|
|0071-NDAFP|        0.0|       0.0|
|0074-HDKDG|        0.0|       0.0|
+----------+-----------+----------+
only showing top 20 rows

False Positives: 175
False Negatives: 220
Accuracy: 0.8031888390632785


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

spark = SparkSession.builder \
    .appName("Logistic Regression Churn Prediction") \
    .getOrCreate()


for model_type in ["Logistic Regression"]:
    for col_name in feature_cols:
        indexed_df_dropped = indexed_df_dropped.withColumn(col_name, col(col_name).cast("double")).fillna(0, subset=[col_name])

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

    lr = LogisticRegression(labelCol="churn_index", featuresCol="features")

    pipeline = Pipeline(stages=[assembler, lr])

    train_data, test_data = indexed_df_dropped.randomSplit([0.7, 0.3], seed=42)

    model = pipeline.fit(train_data)

    predictions = model.transform(test_data)

    evaluator = MulticlassClassificationEvaluator(labelCol="churn_index", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)

    fp = predictions.filter("prediction == 1 AND churn_index == 0").count()
    fn = predictions.filter("prediction == 0 AND churn_index == 1").count()

    result_df = result_df.union(spark.createDataFrame([(model_type,accuracy,fp,fn)], ["Model_Type","Accuracy", "False_Positive_Count","False_Negative_Count"]))

    predictions.select("customerID", "churn_index", "prediction").show()

    print("False Positives:", fp)
    print("False Negatives:", fn)
    print("Accuracy:", accuracy)

+----------+-----------+----------+
|customerID|churn_index|prediction|
+----------+-----------+----------+
|0004-TLHLJ|        1.0|       1.0|
|0013-SMEOE|        0.0|       0.0|
|0015-UOCOJ|        0.0|       0.0|
|0016-QLJIS|        0.0|       0.0|
|0019-EFAEP|        0.0|       0.0|
|0019-GFNTW|        0.0|       0.0|
|0020-INWCK|        0.0|       0.0|
|0023-HGHWL|        1.0|       1.0|
|0023-XUOPT|        1.0|       1.0|
|0030-FNXPP|        0.0|       0.0|
|0031-PVLZI|        1.0|       0.0|
|0042-JVWOJ|        0.0|       0.0|
|0042-RLHYP|        0.0|       0.0|
|0048-LUMLS|        0.0|       0.0|
|0052-DCKON|        0.0|       0.0|
|0056-EPFBG|        0.0|       0.0|
|0057-QBUQH|        0.0|       0.0|
|0064-YIJGF|        0.0|       0.0|
|0071-NDAFP|        0.0|       0.0|
|0074-HDKDG|        0.0|       0.0|
+----------+-----------+----------+
only showing top 20 rows

False Positives: 130
False Negatives: 243
Accuracy: 0.8141504733432985


In [None]:
#gbt

from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GBT Churn Prediction") \
    .getOrCreate()

for model_type in ["GBT"]:
      for col_name in feature_cols:
            indexed_df_dropped = indexed_df_dropped.withColumn(col_name, col(col_name).cast("double")).fillna(0, subset=[col_name])

      assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

      gbt = GBTClassifier(labelCol="churn_index", featuresCol="features")

      pipeline = Pipeline(stages=[assembler, gbt])

      train_data, test_data = indexed_df_dropped.randomSplit([0.7, 0.3], seed=42)

      model = pipeline.fit(train_data)

      predictions = model.transform(test_data)

      evaluator = MulticlassClassificationEvaluator(labelCol="churn_index", predictionCol="prediction", metricName="accuracy")
      accuracy = evaluator.evaluate(predictions)

      fp = predictions.filter("prediction == 1 AND churn_index == 0").count()
      fn = predictions.filter("prediction == 0 AND churn_index == 1").count()

      result_df = result_df.union(spark.createDataFrame([(model_type,accuracy,fp,fn)], ["Model_Type","Accuracy", "False_Positive_Count","False_Negative_Count"]))

      predictions.select("customerID", "churn_index", "prediction").show()

      print("False Positives:", fp)
      print("False Negatives:", fn)
      print("Accuracy:", accuracy)

+----------+-----------+----------+
|customerID|churn_index|prediction|
+----------+-----------+----------+
|0004-TLHLJ|        1.0|       1.0|
|0013-SMEOE|        0.0|       0.0|
|0015-UOCOJ|        0.0|       0.0|
|0016-QLJIS|        0.0|       0.0|
|0019-EFAEP|        0.0|       0.0|
|0019-GFNTW|        0.0|       0.0|
|0020-INWCK|        0.0|       0.0|
|0023-HGHWL|        1.0|       1.0|
|0023-XUOPT|        1.0|       1.0|
|0030-FNXPP|        0.0|       0.0|
|0031-PVLZI|        1.0|       0.0|
|0042-JVWOJ|        0.0|       0.0|
|0042-RLHYP|        0.0|       0.0|
|0048-LUMLS|        0.0|       0.0|
|0052-DCKON|        0.0|       0.0|
|0056-EPFBG|        0.0|       0.0|
|0057-QBUQH|        0.0|       0.0|
|0064-YIJGF|        0.0|       0.0|
|0071-NDAFP|        0.0|       0.0|
|0074-HDKDG|        0.0|       0.0|
+----------+-----------+----------+
only showing top 20 rows

False Positives: 134
False Negatives: 257
Accuracy: 0.8051818634778276


In [None]:
#KNN

from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline as SkPipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer

spark = SparkSession.builder \
    .appName("KNN Churn Prediction") \
    .getOrCreate()

for model_type in ["KNN"]:
    for col_name in feature_cols:
        indexed_df_dropped = indexed_df_dropped.withColumn(col_name, col(col_name).cast("double")).fillna(0, subset=[col_name])

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

    pd_df = indexed_df_dropped.toPandas()

    knn_pipeline = SkPipeline([
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler()),
        ('knn', KNeighborsClassifier(n_neighbors=5))
    ])

    knn_pipeline.fit(pd_df[feature_cols], pd_df['churn_index'])

    test_pd_df = test_data.toPandas()

    predictions = knn_pipeline.predict(test_pd_df[feature_cols])

    predictions_df = spark.createDataFrame(test_pd_df[['customerID', 'churn_index']].assign(prediction=predictions))

    evaluator = MulticlassClassificationEvaluator(labelCol="churn_index", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions_df)

    fp = predictions_df.filter("prediction == 1 AND churn_index == 0").count()
    fn = predictions_df.filter("prediction == 0 AND churn_index == 1").count()

    result_df = result_df.union(spark.createDataFrame([(model_type,accuracy,fp,fn)], ["Model_Type","Accuracy", "False_Positive_Count","False_Negative_Count"]))

    predictions_df.select("customerID", "churn_index", "prediction").show()

    print("False Positives:", fp)
    print("False Negatives:", fn)
    print("Accuracy:", accuracy)


+----------+-----------+----------+
|customerID|churn_index|prediction|
+----------+-----------+----------+
|0004-TLHLJ|        1.0|       1.0|
|0013-SMEOE|        0.0|       0.0|
|0015-UOCOJ|        0.0|       1.0|
|0016-QLJIS|        0.0|       0.0|
|0019-EFAEP|        0.0|       0.0|
|0019-GFNTW|        0.0|       0.0|
|0020-INWCK|        0.0|       0.0|
|0023-HGHWL|        1.0|       1.0|
|0023-XUOPT|        1.0|       1.0|
|0030-FNXPP|        0.0|       0.0|
|0031-PVLZI|        1.0|       0.0|
|0042-JVWOJ|        0.0|       0.0|
|0042-RLHYP|        0.0|       0.0|
|0048-LUMLS|        0.0|       0.0|
|0052-DCKON|        0.0|       0.0|
|0056-EPFBG|        0.0|       0.0|
|0057-QBUQH|        0.0|       0.0|
|0064-YIJGF|        0.0|       0.0|
|0071-NDAFP|        0.0|       0.0|
|0074-HDKDG|        0.0|       0.0|
+----------+-----------+----------+
only showing top 20 rows

False Positives: 114
False Negatives: 197
Accuracy: 0.8450423517688092


In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns


pd_df = indexed_df_dropped.toPandas()

knn_pipeline = SkPipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler', StandardScaler()),
    ('knn', KNeighborsClassifier(n_neighbors=5))
])

knn_pipeline.fit(pd_df[feature_cols], pd_df['churn_index'])

test_pd_df = test_data.toPandas()

predictions = knn_pipeline.predict(test_pd_df[feature_cols])

predictions_df = spark.createDataFrame(test_pd_df[['customerID','tenure','TotalCharges','churn_index']].assign(prediction=predictions))

positive_churn_predictions = predictions_df.filter("churn_index == 1")

positive_churn_pd_df = positive_churn_predictions.toPandas()

plt.figure(figsize=(10, 6))
sns.scatterplot(data=positive_churn_pd_df, x='tenure', y='TotalCharges', hue='prediction', palette='viridis', alpha=0.6)
plt.title('Positive Churn Predictions')
plt.xlabel('Tenure')
plt.ylabel('TotalCharges')
plt.legend(title='Prediction')

plt.show()

In [None]:
false_negative_percentage = (col("False_Negative_Count") / 1869) * 100

result_df = result_df.withColumn("False_Negative_Percentage",
                                  concat(format_number(false_negative_percentage, 2), lit("%")))

result_df.orderBy("False_Negative_Percentage", ascending=True).show()

+-------------------+------------------+--------------------+--------------------+-------------------------+
|         Model_Type|          Accuracy|False_Positive_Count|False_Negative_Count|False_Negative_Percentage|
+-------------------+------------------+--------------------+--------------------+-------------------------+
|                KNN|0.8450423517688092|                 114|                 197|                   10.54%|
|      Decision Tree|0.8031888390632785|                 175|                 220|                   11.77%|
|Logistic Regression|0.8141504733432985|                 130|                 243|                   13.00%|
|                GBT|0.8051818634778276|                 134|                 257|                   13.75%|
+-------------------+------------------+--------------------+--------------------+-------------------------+



In [None]:
num_rows = telco_df.count()

num_features = len(telco_df.columns)

print("Number of rows:", num_rows)
print("Number of features:", num_features)

Number of rows: 7043
Number of features: 21
