In [1]:
!pip install findspark
# import all
import findspark
findspark.init()
import kagglehub
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

# Import SparkSession
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("CustomerSupportAnalysis").getOrCreate()

# Download latest version
path = kagglehub.dataset_download("blastchar/telco-customer-churn")
path += "/WA_Fn-UseC_-Telco-Customer-Churn.csv"

df = spark.read.csv(path, header=True, inferSchema=True)
df.limit(5).show()


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+--------

In [2]:
df.groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|Female| 3488|
|  Male| 3555|
+------+-----+



In [3]:
for col in df.columns:
  df.groupBy(col).count().orderBy(desc("count")).show()

+----------+-----+
|customerID|count|
+----------+-----+
|3668-QPYBK|    1|
|6234-RAAPL|    1|
|1894-IGFSG|    1|
|6982-SSHFK|    1|
|5859-HZYLF|    1|
|6479-OAUSD|    1|
|2592-YKDIF|    1|
|6718-BDGHG|    1|
|3195-TQDZX|    1|
|4248-QPAVC|    1|
|5668-MEISB|    1|
|5802-ADBRC|    1|
|2712-SYWAY|    1|
|2011-TRQYE|    1|
|7244-KXYZN|    1|
|0953-LGOVU|    1|
|3623-FQBOX|    1|
|3692-JHONH|    1|
|3528-HFRIQ|    1|
|7661-CPURM|    1|
+----------+-----+
only showing top 20 rows

+------+-----+
|gender|count|
+------+-----+
|  Male| 3555|
|Female| 3488|
+------+-----+

+-------------+-----+
|SeniorCitizen|count|
+-------------+-----+
|            0| 5901|
|            1| 1142|
+-------------+-----+

+-------+-----+
|Partner|count|
+-------+-----+
|     No| 3641|
|    Yes| 3402|
+-------+-----+

+----------+-----+
|Dependents|count|
+----------+-----+
|        No| 4933|
|       Yes| 2110|
+----------+-----+

+------+-----+
|tenure|count|
+------+-----+
|     1|  613|
|    72|  362|
|     2

In [4]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc
window = Window.partitionBy("Churn").orderBy(desc("tenure")) # Call desc within orderBy
window_df = df.withColumn("row_number", row_number().over(window)).show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+----------------+--------------------+--------------+------------+-----+----------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|row_number|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+----------------+--------------------+--------------+------------+-----+----------+
|5248-YGIJN|  Male|            0|    Yes|     

In [5]:
window_df = df.withColumn("row_number", row_number().over(window))
window_df.filter(window_df["row_number"] <= 3).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+--------------------+--------------+------------+-----+----------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|row_number|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+--------------------+--------------+------------+-----+----------+
|5248-YGIJN|  Male|            0|    Yes|        No|    72|         Yes|          Yes|            DSL|           Yes|         Yes|             Yes|        Yes| 

In [6]:
churn_df = window_df.filter(window_df["churn"] == "Yes").limit(10).show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+----------+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|row_number|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+----------+
|8809-RIHDD|  Male|            0|    Yes|       Yes|    72|         Yes|             Yes|    Fiber optic|            No|         Yes|

In [7]:
churn_df = window_df.filter(window_df["churn"] == "Yes")
for col in churn_df.columns:
  # churn_df.groupBy(col).count().orderBy(desc("count")).show()
  if col != "customerID" and col != "tenure" and col != "MonthlyCharges" and col != "TotalCharges" and col != "row_number":
    print(col)
    for row in churn_df.groupBy(col).count().orderBy(desc("count")).collect():
      print(row[col])
    churn_df.groupBy(col).count().orderBy(desc("count")).show()



gender
Female
Male
+------+-----+
|gender|count|
+------+-----+
|Female|  939|
|  Male|  930|
+------+-----+

SeniorCitizen
0
1
+-------------+-----+
|SeniorCitizen|count|
+-------------+-----+
|            0| 1393|
|            1|  476|
+-------------+-----+

Partner
No
Yes
+-------+-----+
|Partner|count|
+-------+-----+
|     No| 1200|
|    Yes|  669|
+-------+-----+

Dependents
No
Yes
+----------+-----+
|Dependents|count|
+----------+-----+
|        No| 1543|
|       Yes|  326|
+----------+-----+

PhoneService
Yes
No
+------------+-----+
|PhoneService|count|
+------------+-----+
|         Yes| 1699|
|          No|  170|
+------------+-----+

MultipleLines
Yes
No
No phone service
+----------------+-----+
|   MultipleLines|count|
+----------------+-----+
|             Yes|  850|
|              No|  849|
|No phone service|  170|
+----------------+-----+

InternetService
Fiber optic
DSL
No
+---------------+-----+
|InternetService|count|
+---------------+-----+
|    Fiber optic| 1297|
| 

In [8]:
print(churn_df.columns)

['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'Churn', 'row_number']


In [9]:
from pyspark.sql import functions as F

# Total number of churned customers
total_churn = churn_df.count()
print(type(total_churn))
# Loop through relevant columns
for col in churn_df.columns:
    if col not in ["customerID", "tenure", "MonthlyCharges", "TotalCharges", "row_number"]:
        print(f"\nColumn: {col}")
        churn_df.groupBy(col) \
            .agg(
                F.count("*").alias("count"),
                (F.count("*") / total_churn * 100).alias("percentage")
            ) \
            .orderBy(F.desc("count")) \
            .show(truncate=False)


<class 'int'>

Column: gender
+------+-----+-----------------+
|gender|count|percentage       |
+------+-----+-----------------+
|Female|939  |50.24077046548957|
|Male  |930  |49.75922953451043|
+------+-----+-----------------+


Column: SeniorCitizen
+-------------+-----+------------------+
|SeniorCitizen|count|percentage        |
+-------------+-----+------------------+
|0            |1393 |74.53183520599251 |
|1            |476  |25.468164794007492|
+-------------+-----+------------------+


Column: Partner
+-------+-----+-----------------+
|Partner|count|percentage       |
+-------+-----+-----------------+
|No     |1200 |64.20545746388443|
|Yes    |669  |35.79454253611557|
+-------+-----+-----------------+


Column: Dependents
+----------+-----+------------------+
|Dependents|count|percentage        |
+----------+-----+------------------+
|No        |1543 |82.55751738897807 |
|Yes       |326  |17.442482611021937|
+----------+-----+------------------+


Column: PhoneService
+-------

In [12]:
df.show(10)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

In [18]:
drop_feature = ["customerID", "gender", "SeniorCitizen", "PaperlessBilling","PaymentMethod"]
print(df.columns)
for i in drop_feature:
  df = df.drop(i)
print(df.columns)

['gender', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'MonthlyCharges', 'TotalCharges', 'Churn']
['Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'MonthlyCharges', 'TotalCharges', 'Churn']


In [23]:
df.isEmpty()

False

In [27]:
df = df.dropna()

In [29]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

# Define columns
categorical = ['Partner', 'Dependents', 'PhoneService', 'MultipleLines', 'InternetService',
               'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport',
               'StreamingTV', 'StreamingMovies', 'Contract']
numerical = ['tenure', 'MonthlyCharges', 'TotalCharges']

# Preprocessing
preprocessor = ColumnTransformer([
    ('num', StandardScaler(), numerical),
    ('cat', OneHotEncoder(handle_unknown='ignore'), categorical)
])


In [32]:
print(preprocessor)

ColumnTransformer(transformers=[('num', StandardScaler(),
                                 ['tenure', 'MonthlyCharges', 'TotalCharges']),
                                ('cat', OneHotEncoder(handle_unknown='ignore'),
                                 ['Partner', 'Dependents', 'PhoneService',
                                  'MultipleLines', 'InternetService',
                                  'OnlineSecurity', 'OnlineBackup',
                                  'DeviceProtection', 'TechSupport',
                                  'StreamingTV', 'StreamingMovies',
                                  'Contract'])])


In [30]:
clf = Pipeline(steps=[
    ('preprocess', preprocessor),
    ('classifier', RandomForestClassifier(random_state=42))
])


In [44]:
from pyspark.sql.functions import when, col, trim

df = df.withColumn("label", when(col("Churn") == "Yes", 1).otherwise(0))
df = df.filter(trim(col("TotalCharges")) != "")

X = df.drop("label").toPandas()

y = df.select("label").toPandas()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)

print(classification_report(y_test, y_pred))


  return fit_method(estimator, *args, **kwargs)


              precision    recall  f1-score   support

           0       0.82      0.89      0.85      1033
           1       0.59      0.45      0.51       374

    accuracy                           0.77      1407
   macro avg       0.70      0.67      0.68      1407
weighted avg       0.76      0.77      0.76      1407

