In [1]:
import pandas as pd
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as func
from pyspark.sql import SparkSession


sqlCtx = SparkSession.builder.getOrCreate()

In [3]:
data = 'new_data4.csv'
df_one = pd.read_csv(data, index_col=0)

df_one.head()

Unnamed: 0_level_0,gender,SeniorCitizen,Dependents,tenure,PhoneService,MultipleLines,OnlineBackup,DeviceProtection,TechSupport,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,TotalCharges,Churn
customerID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
4472-LVYGI,Female,0,Yes,0,No,No phone service,No,Yes,Yes,No,Two year,Yes,Bank transfer (automatic),1.0,No
3115-CZMZD,Male,0,Yes,0,Yes,No,No internet service,No internet service,No internet service,No internet service,Two year,No,Mailed check,1.0,No
5709-LVOEQ,Female,0,Yes,0,Yes,No,Yes,Yes,No,Yes,Two year,No,Mailed check,1.0,No
4367-NUYAO,Male,0,Yes,0,Yes,Yes,No internet service,No internet service,No internet service,No internet service,Two year,No,Mailed check,1.0,No
1371-DWPAZ,Female,0,Yes,0,No,No phone service,Yes,Yes,Yes,No,Two year,No,Credit card (automatic),1.0,No


In [13]:
data_dummies = pd.get_dummies(df_one['Churn']).head()
print(data_dummies)

            No  Yes
customerID         
4472-LVYGI   1    0
3115-CZMZD   1    0
5709-LVOEQ   1    0
4367-NUYAO   1    0
1371-DWPAZ   1    0


In [23]:
data_all = pd.concat([df_one, data_dummies], axis = 1)
print(data_all)

            gender  SeniorCitizen Dependents  tenure PhoneService  \
customerID                                                          
4472-LVYGI  Female              0        Yes       0           No   
3115-CZMZD    Male              0        Yes       0          Yes   
5709-LVOEQ  Female              0        Yes       0          Yes   
4367-NUYAO    Male              0        Yes       0          Yes   
1371-DWPAZ  Female              0        Yes       0           No   
...            ...            ...        ...     ...          ...   
9924-JPRMC    Male              0         No      72          Yes   
9788-HNGUT    Male              0         No      72          Yes   
9739-JLPQJ  Female              0        Yes      72          Yes   
7569-NMZYQ  Female              0        Yes      72          Yes   
2889-FPWRM    Male              0         No      72          Yes   

               MultipleLines         OnlineBackup     DeviceProtection  \
customerID                  

In [6]:
charges_training = sqlCtx.read.option('header','true').options(delimiter=",").csv('new_data4.csv')
print("column types:", charges_training.dtypes)
print("Rows :", charges_training.count())

charges_training = charges_training.withColumn("TotalCharges_Double", func.col("TotalCharges").cast(DoubleType()))
print("column types:", charges_training.dtypes)

charges_training.show()

column types: [('customerID', 'string'), ('gender', 'string'), ('SeniorCitizen', 'string'), ('Dependents', 'string'), ('tenure', 'string'), ('PhoneService', 'string'), ('MultipleLines', 'string'), ('OnlineBackup', 'string'), ('DeviceProtection', 'string'), ('TechSupport', 'string'), ('StreamingMovies', 'string'), ('Contract', 'string'), ('PaperlessBilling', 'string'), ('PaymentMethod', 'string'), ('TotalCharges', 'string'), ('Churn', 'string')]
Rows : 7043
column types: [('customerID', 'string'), ('gender', 'string'), ('SeniorCitizen', 'string'), ('Dependents', 'string'), ('tenure', 'string'), ('PhoneService', 'string'), ('MultipleLines', 'string'), ('OnlineBackup', 'string'), ('DeviceProtection', 'string'), ('TechSupport', 'string'), ('StreamingMovies', 'string'), ('Contract', 'string'), ('PaperlessBilling', 'string'), ('PaymentMethod', 'string'), ('TotalCharges', 'string'), ('Churn', 'string'), ('TotalCharges_Double', 'double')]
+----------+------+-------------+----------+------+----

In [5]:
serviceIndexer = StringIndexer(inputCol="PhoneService", outputCol="indexedLabel")
serviceIndexer_fit = serviceIndexer.fit(charges_training)
dataframe_training = serviceIndexer_fit.transform(charges_training)
print(dataframe_training.dtypes)

dataframe_training.show()

[('customerID', 'string'), ('gender', 'string'), ('SeniorCitizen', 'string'), ('Dependents', 'string'), ('tenure', 'string'), ('PhoneService', 'string'), ('MultipleLines', 'string'), ('OnlineBackup', 'string'), ('DeviceProtection', 'string'), ('TechSupport', 'string'), ('StreamingMovies', 'string'), ('Contract', 'string'), ('PaperlessBilling', 'string'), ('PaymentMethod', 'string'), ('TotalCharges', 'string'), ('Churn', 'string'), ('TotalCharges_Double', 'double'), ('indexedLabel', 'double')]
+----------+------+-------------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+------------+-----+-------------------+------------+
|customerID|gender|SeniorCitizen|Dependents|tenure|PhoneService|   MultipleLines|       OnlineBackup|   DeviceProtection|        TechSupport|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|TotalCharges|Churn|To

In [6]:
churnIndexer = StringIndexer(inputCol="Churn", outputCol="churn feature")

churnIndexer_fit = churnIndexer.fit(dataframe_training)

dataframe_training = churnIndexer_fit.transform(dataframe_training)
print(dataframe_training.dtypes)

dataframe_training.show()

[('customerID', 'string'), ('gender', 'string'), ('SeniorCitizen', 'string'), ('Dependents', 'string'), ('tenure', 'string'), ('PhoneService', 'string'), ('MultipleLines', 'string'), ('OnlineBackup', 'string'), ('DeviceProtection', 'string'), ('TechSupport', 'string'), ('StreamingMovies', 'string'), ('Contract', 'string'), ('PaperlessBilling', 'string'), ('PaymentMethod', 'string'), ('TotalCharges', 'string'), ('Churn', 'string'), ('TotalCharges_Double', 'double'), ('indexedLabel', 'double'), ('churn feature', 'double')]
+----------+------+-------------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+------------+-----+-------------------+------------+-------------+
|customerID|gender|SeniorCitizen|Dependents|tenure|PhoneService|   MultipleLines|       OnlineBackup|   DeviceProtection|        TechSupport|    StreamingMovies|      Contract|PaperlessBilling

In [7]:
featureAssembler = VectorAssembler(inputCols = ['TotalCharges_Double', 'churn feature'] , outputCol='features')
dataframe_training = featureAssembler.transform(dataframe_training)
dataframe_training.show()

+----------+------+-------------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+------------+-----+-------------------+------------+-------------+-----------+
|customerID|gender|SeniorCitizen|Dependents|tenure|PhoneService|   MultipleLines|       OnlineBackup|   DeviceProtection|        TechSupport|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|TotalCharges|Churn|TotalCharges_Double|indexedLabel|churn feature|   features|
+----------+------+-------------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+------------+-----+-------------------+------------+-------------+-----------+
|4472-LVYGI|Female|            0|       Yes|     0|          No|No phone service|                 No|                Yes|

In [8]:
dataframe_training_output = dataframe_training.select(['indexedLabel', 'features'])
dataframe_training_output.show(5)

+------------+---------+
|indexedLabel| features|
+------------+---------+
|         1.0|[1.0,0.0]|
|         0.0|[1.0,0.0]|
|         0.0|[1.0,0.0]|
|         0.0|[1.0,0.0]|
|         1.0|[1.0,0.0]|
+------------+---------+
only showing top 5 rows



In [9]:
charges_test = sqlCtx.read.option('header','true').options(delimiter=",").csv('new_data4.csv')
charges_test = charges_test.withColumn("TotalCharges_Double",func.col("TotalCharges").cast(DoubleType()))

In [10]:
serviceIndexer_test = StringIndexer(inputCol="PhoneService", outputCol="indexedLabel")
serviceIndexer_fit = serviceIndexer_test.fit(charges_training)
dataframe_test = serviceIndexer_fit.transform(charges_training)

In [11]:
churnIndexer = StringIndexer(inputCol="Churn", outputCol="churn feature")
churnIndexer_fit = churnIndexer.fit(dataframe_test)
dataframe_test = churnIndexer_fit.transform(dataframe_test)

In [12]:
featureAssembler = VectorAssembler(inputCols = ['TotalCharges_Double', 'churn feature'] , outputCol='features')
dataframe_test = featureAssembler.transform(dataframe_test)

In [13]:
dataframe_test = dataframe_test.select(['indexedLabel', 'features'])
dataframe_test.show()

+------------+-----------+
|indexedLabel|   features|
+------------+-----------+
|         1.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         1.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0|  [1.0,0.0]|
|         0.0| [18.8,0.0]|
|         0.0|[18.85,0.0]|
|         0.0|[18.85,1.0]|
|         0.0| [18.9,0.0]|
|         0.0| [19.0,0.0]|
|         0.0|[19.05,0.0]|
|         0.0| [19.1,0.0]|
|         0.0| [19.1,1.0]|
|         0.0| [19.1,0.0]|
+------------+-----------+
only showing top 20 rows



In [14]:
dtc = DecisionTreeClassifier(featuresCol="features", labelCol="indexedLabel")

dtc = dtc.fit(dataframe_training)

pred = dtc.transform(dataframe_test)
pred.show(10)

+------------+---------+--------------+--------------------+----------+
|indexedLabel| features| rawPrediction|         probability|prediction|
+------------+---------+--------------+--------------------+----------+
|         1.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         1.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
|         0.0|[1.0,0.0]|[6361.0,682.0]|[0.90316626437597...|       0.0|
+------------+---------+--------------+--------------------+----

In [17]:
tp = pred.filter((pred.indexedLabel == 1) & (pred.prediction == 1)).count()
tn = pred.filter((pred.indexedLabel == 0) & (pred.prediction == 0)).count()
fp = pred.filter((pred.indexedLabel == 0) & (pred.prediction == 1)).count()
fn = pred.filter((pred.indexedLabel == 1) & (pred.prediction == 0)).count()

print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

a = ((tp + tn)/pred.count())
r = float(tp) / (tp + fn)
p = float(tn) / (tn + fn)
f1 = 2 * ((p * r)/(p + r))

print("Accuracy:", a)
print("Recall:", r)
print("Precision:", p)
print("F1 score:", f1)

True Positives: 0
True Negatives: 6361
False Positives: 0
False Negatives: 682
Accuracy: 0.9031662643759761
Recall: 0.0
Precision: 0.9031662643759761
F1 score: 0.0
