In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName('Dataframe').getOrCreate()

In [3]:
# To import our dataset
df= spark.read.option('header','true').csv('Tele Churn-Dataset.csv', inferSchema=True)

In [4]:
# To display the first three rows
df.show(3)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+---------------+--------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|   PaymentMethod|MonthlyCharges|TotalCharges|numAdminTickets|numTechTickets|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+---------------+--------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone serv

In [5]:
# To check for dataset info
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- numAdminTickets: integer (nullable = true)
 |-- numTechTickets: integer (nullable = true)
 |-- Churn: string (null

In [7]:
# To check columns in our dataset
print(df.columns)

['customerID', 'gender', 'SeniorCitizen', 'Partner', 'Dependents', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV', 'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges', 'TotalCharges', 'numAdminTickets', 'numTechTickets', 'Churn']


In [6]:
# To check for missing values
from pyspark.sql.functions import col, sum
missing_values = df.select(*[sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
missing_values.show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+---------------+--------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|TotalCharges|numAdminTickets|numTechTickets|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+-------------+--------------+------------+---------------+--------------+-----+
|         0|     0|            0|      0|         0|     0|           0|            0|              0|             0|    

In [8]:
# To change datatype from integer to string
df = df.withColumn("SeniorCitizen", col("SeniorCitizen").cast("string"))

In [9]:
# To check if has been converted
df.select('SeniorCitizen').dtypes

[('SeniorCitizen', 'string')]

In [10]:
# To convert string to float
df = df.withColumn('TotalCharges', col('TotalCharges').cast('float'))
df.select('TotalCharges').dtypes

[('TotalCharges', 'float')]

In [11]:
# To check the summary statistic of numeric columns
df.describe().toPandas()

Unnamed: 0,summary,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,...,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,numAdminTickets,numTechTickets,Churn
0,count,7043,7043,7043.0,7043,7043,7043.0,7043,7043,7043,...,7043,7043,7043,7043,7043,7043.0,7032.0,7043.0,7043.0,7043
1,mean,,,0.1621468124378816,,,32.37114865824223,,,,...,,,,,,64.76169246059922,2283.300441385536,0.5156893369302854,0.4195655260542382,
2,stddev,,,0.3686116056100135,,,24.55948102309444,,,,...,,,,,,30.09004709767848,2266.771363107635,1.275298736979496,1.250116942584854,
3,min,0002-ORFBO,Female,0.0,No,No,0.0,No,No,DSL,...,No,No,Month-to-month,No,Bank transfer (automatic),18.25,18.8,0.0,0.0,No
4,max,9995-HOTOH,Male,1.0,Yes,Yes,72.0,Yes,Yes,No,...,Yes,Yes,Two year,Yes,Mailed check,118.75,8684.8,5.0,9.0,Yes


In [12]:
# dropped unwanted columns
df = df[['gender', 'SeniorCitizen', 'Partner', 'tenure', 'PhoneService', 'MultipleLines', 'InternetService', 
         'OnlineSecurity', 'OnlineBackup', 'TechSupport', 'StreamingTV',
         'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'MonthlyCharges',
         'TotalCharges', 'numAdminTickets', 'numTechTickets', 'Churn']]

In [13]:
# To check for missing values 
missing_values = df.select(*[sum(col('TotalCharges').isNull().cast("float")).alias('TotalCharges') for column in df.columns])
missing_values.show()

+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|TotalCharges|
+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|        11.0|     

In [15]:
# To fill missing values with median
from pyspark.ml.feature import Imputer
imputer = Imputer(
    inputCols=['TotalCharges'],
    outputCols=["{}_imputed".format('TotalCharges')]
).setStrategy('median')

# Fit the Imputer model and transform the DataFrame
imputer_model = imputer.fit(df)
df = imputer_model.transform(df)

In [16]:
missing_values = df.select(*[sum(col('TotalCharges_imputed').isNull().cast("float")).alias('TotalCharges_Imputer_m') for column in df.columns])
missing_values.show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|TotalCharges_Imputer_m|
+----------------------+------

In [19]:
print(df.dtypes)

[('gender', 'string'), ('SeniorCitizen', 'string'), ('Partner', 'string'), ('tenure', 'int'), ('PhoneService', 'string'), ('MultipleLines', 'string'), ('InternetService', 'string'), ('OnlineSecurity', 'string'), ('OnlineBackup', 'string'), ('TechSupport', 'string'), ('StreamingTV', 'string'), ('StreamingMovies', 'string'), ('Contract', 'string'), ('PaperlessBilling', 'string'), ('PaymentMethod', 'string'), ('MonthlyCharges', 'double'), ('TotalCharges', 'float'), ('numAdminTickets', 'int'), ('numTechTickets', 'int'), ('Churn', 'string'), ('TotalCharges_imputed', 'float')]


In [20]:
# To replace NO phone service to No
df = df.replace('No phone service', 'No')
df = df.replace('No internet service', 'No')
df.show(2)

+------+-------------+-------+------+------------+-------------+---------------+--------------+------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+---------------+--------------+-----+--------------------+
|gender|SeniorCitizen|Partner|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|   PaymentMethod|MonthlyCharges|TotalCharges|numAdminTickets|numTechTickets|Churn|TotalCharges_imputed|
+------+-------------+-------+------+------------+-------------+---------------+--------------+------------+-----------+-----------+---------------+--------------+----------------+----------------+--------------+------------+---------------+--------------+-----+--------------------+
|Female|            0|    Yes|     1|          No|           No|            DSL|            No|         Yes|         No|         No|             No|

In [21]:
#from pyspark.sql.functions import when

#df = df.withColumn('PhoneService', when(df['PhoneService'] == 'No phone service', 'No').otherwise(df['PhoneService']))

#df.show()

In [21]:
# To split our test and train data
train, test =df.randomSplit([0.75,0.25])

In [22]:
# To select numeric columns
numerical_features = train.columns
numerical_features.remove('gender')
numerical_features.remove('SeniorCitizen')
numerical_features.remove('PhoneService')
numerical_features.remove('MultipleLines')
numerical_features.remove('InternetService')
numerical_features.remove('TechSupport')
numerical_features.remove('StreamingTV')
numerical_features.remove('PaperlessBilling')
numerical_features.remove('Contract')
numerical_features.remove('PaymentMethod')
numerical_features.remove('OnlineSecurity')
numerical_features.remove('OnlineBackup')
numerical_features.remove('Partner')
numerical_features.remove('Churn')
numerical_features.remove('StreamingMovies')
numerical_features.remove('TotalCharges')

In [23]:
print(numerical_features)

['tenure', 'MonthlyCharges', 'numAdminTickets', 'numTechTickets', 'TotalCharges_imputed']


In [24]:
from pyspark.ml.feature import VectorAssembler

numerical_vector_assembler = VectorAssembler(inputCols=numerical_features, outputCol='numerical_features_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(1)

+------+-------------+-------+------+------------+-------------+---------------+--------------+------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+---------------+--------------+-----+--------------------+-------------------------+
|gender|SeniorCitizen|Partner|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|numAdminTickets|numTechTickets|Churn|TotalCharges_imputed|numerical_features_vector|
+------+-------------+-------+------+------------+-------------+---------------+--------------+------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+---------------+--------------+-----+--------------------+-------------------------+
|Female|            0|     No|     1|          No|        

In [25]:
# To scale numeric features
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='numerical_features_vector', outputCol='scaled_numerical_features_vector',
                       withMean=True, withStd=True)
scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)

train.show(3)

+------+-------------+-------+------+------------+-------------+---------------+--------------+------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+---------------+--------------+-----+--------------------+-------------------------+--------------------------------+
|gender|SeniorCitizen|Partner|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|numAdminTickets|numTechTickets|Churn|TotalCharges_imputed|numerical_features_vector|scaled_numerical_features_vector|
+------+-------------+-------+------+------------+-------------+---------------+--------------+------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+---------------+--------------+-----+--------------------+-------------------

In [26]:
# To check the scales features
train.select('scaled_numerical_features_vector').show(truncate=False)

+------------------------------------------------------------------------------------------------------+
|scaled_numerical_features_vector                                                                      |
+------------------------------------------------------------------------------------------------------+
|[-1.2860087573469972,-1.3203161350835437,-0.41124114221123836,-0.3346561932383224,-1.0019135678956095]|
|[-1.2860087573469972,-1.3419754027702782,2.6989265502985487,-0.3346561932383224,-1.0021997834036842]  |
|[-1.2860087573469972,-1.348639792827735,-0.41124114221123836,-0.3346561932383224,-1.0022878501014916] |
|[-1.2860087573469972,-1.3219822325979078,1.1438427040436554,-0.3346561932383224,-1.0019355841501283]  |
|[-1.2860087573469972,-1.331978817684093,-0.41124114221123836,-0.3346561932383224,-1.0020676841968394] |
|[-1.2860087573469972,-1.3253144276266362,-0.41124114221123836,-0.3346561932383224,-1.0019796174990319]|
|[-1.2860087573469972,-1.3236483301122721,3.47646847342

In [27]:
# To convert our caterogical features to numeric features
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCols=['gender', 'SeniorCitizen', 'Partner', 'PhoneService', 'MultipleLines', 'InternetService', 
         'OnlineSecurity', 'OnlineBackup', 'TechSupport', 'StreamingTV',
         'StreamingMovies', 'Contract', 'PaperlessBilling', 'PaymentMethod', 'Churn'], 
                        outputCols=['gender_index', 'SeniorCitizen_index', 'Partner_index', 'PhoneService_index', 
                                    'MultipleLines_index', 'InternetService_index', 'OnlineSecurity_index', 'OnlineBackup_index', 
                                    'TechSupport_index', 'StreamingTV_index', 'StreamingMovies_index', 'Contract_index', 
                                    'PaperlessBilling_index', 'PaymentMethod_index', 'Churn_index'])

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

In [28]:
# To check if they have been converted but checking the gender feature
set(train.select('gender_index').collect())

{Row(gender_index=0.0), Row(gender_index=1.0)}

In [29]:
# To combine multiple features to a single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['scaled_numerical_features_vector', 'gender_index', 'SeniorCitizen_index', 'Partner_index', 'PhoneService_index', 
                                    'MultipleLines_index', 'InternetService_index', 'OnlineSecurity_index', 'OnlineBackup_index', 
                                    'TechSupport_index', 'StreamingTV_index', 'StreamingMovies_index', 'Contract_index', 
                                    'PaperlessBilling_index', 'PaymentMethod_index'],
                                       outputCol='finalized_data')

train = assembler.transform(train)
test = assembler.transform(test)

In [30]:
train.select('finalized_data').take(2)

[Row(finalized_data=SparseVector(19, {0: -1.286, 1: -1.3203, 2: -0.4112, 3: -0.3347, 4: -1.0019, 5: 1.0, 8: 1.0, 10: 1.0, 17: 1.0, 18: 2.0})),
 Row(finalized_data=SparseVector(19, {0: -1.286, 1: -1.342, 2: 2.6989, 3: -0.3347, 4: -1.0022, 5: 1.0, 8: 1.0, 10: 1.0, 17: 1.0}))]

In [31]:
from pyspark.ml.classification import LogisticRegression

LR = LogisticRegression(featuresCol='finalized_data', labelCol='Churn_index')
lr_model =LR.fit(train)

In [32]:
pred = lr_model.transform(test)

In [33]:
# To show prediction f our model
pred.select(['Churn_index','prediction']).show(5)

+-----------+----------+
|Churn_index|prediction|
+-----------+----------+
|        0.0|       1.0|
|        0.0|       1.0|
|        1.0|       1.0|
|        1.0|       1.0|
|        0.0|       0.0|
+-----------+----------+
only showing top 5 rows



In [34]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Create a MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(metricName='accuracy', labelCol='Churn_index', predictionCol='prediction')

# Calculate the evaluation metric
accuracy = evaluator.evaluate(pred)
print("Accuracy:", accuracy)


# Precision
precision = evaluator.setMetricName('weightedPrecision').evaluate(pred)
print("Precision:", precision)

# Recall
recall = evaluator.setMetricName('weightedRecall').evaluate(pred)
print("Recall:", recall)

# F1-score
f1_score = evaluator.setMetricName('f1').evaluate(pred)
print("F1-score:", f1_score)

Accuracy: 0.8618346545866364
Precision: 0.8586650410976584
Recall: 0.8618346545866364
F1-score: 0.8594117285901588


In [35]:
confusion_matrix = pred.groupBy('Churn_index').pivot('prediction').count().na.fill(0)

# Display the confusion matrix
print("Confusion Matrix:")
confusion_matrix.show()

Confusion Matrix:
+-----------+----+---+
|Churn_index| 0.0|1.0|
+-----------+----+---+
|        0.0|1183| 97|
|        1.0| 147|339|
+-----------+----+---+



In [41]:
coefficients = lr_model.coefficients
feature_importance = [(feature, coefficient) for feature, coefficient in zip(train, coefficients)]

# Sort the feature importance in descending order
sorted_feature_importance = sorted(feature_importance, key=lambda x: abs(x[1]), reverse=True)

# Print the feature importance
for feature, importance in sorted_feature_importance:
    print(f"Feature: {feature}, Importance: {importance}")

Feature: Column<'gender'>, Importance: -1.8659852771599688
Feature: Column<'StreamingTV'>, Importance: -1.7279379381490698
Feature: Column<'tenure'>, Importance: 1.6884904414507846
Feature: Column<'SeniorCitizen'>, Importance: -0.8254948229055694
Feature: Column<'TotalCharges'>, Importance: -0.5341117096735656
Feature: Column<'TechSupport'>, Importance: 0.47219157765554887
Feature: Column<'StreamingMovies'>, Importance: -0.39842732488858784
Feature: Column<'numAdminTickets'>, Importance: -0.35819890997105996
Feature: Column<'PaymentMethod'>, Importance: 0.297423652603403
Feature: Column<'PhoneService'>, Importance: -0.2573576225052059
Feature: Column<'InternetService'>, Importance: 0.2517474070622676
Feature: Column<'MonthlyCharges'>, Importance: 0.23299058882127066
Feature: Column<'Contract'>, Importance: -0.22850375050860705
Feature: Column<'OnlineSecurity'>, Importance: -0.12096551066373451
Feature: Column<'numTechTickets'>, Importance: -0.11755786778143341
Feature: Column<'Partner'