In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkContext


sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

#loading data in a dataframe
df_data = sqlContext.read.format('com.databricks.spark.csv').\
options(header='true', inferschema='true').load('churn.csv')


df_data.cache()
df_data.head(1) 


In [5]:
df_data.printSchema()

root
 |-- 'state: string (nullable = true)
 |-- account_length: double (nullable = true)
 |-- area_code: double (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- international_plan: string (nullable = true)
 |-- voice_mail_plan: string (nullable = true)
 |-- number_vmail_messages: double (nullable = true)
 |-- total_day_minutes: double (nullable = true)
 |-- total_day_calls: double (nullable = true)
 |-- total_day_charge: double (nullable = true)
 |-- total_eve_minutes: double (nullable = true)
 |-- total_eve_calls: double (nullable = true)
 |-- total_eve_charge: double (nullable = true)
 |-- total_night_minutes: double (nullable = true)
 |-- total_night_calls: double (nullable = true)
 |-- total_night_charge: double (nullable = true)
 |-- total_intl_minutes: double (nullable = true)
 |-- total_intl_calls: double (nullable = true)
 |-- total_intl_charge: double (nullable = true)
 |-- number_customer_service_calls: double (nullable = true)
 |-- churned': string (nullabl

In [6]:
trainDF, testDF = df_data.randomSplit([.8,.2],seed=12345)

In [7]:
trainDF.count()

3956

In [8]:
testDF.count()

1044

In [12]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler # see https://spark.apache.org/docs/latest/ml-features.html


assembler1 = VectorAssembler(
    inputCols = [
        'number_customer_service_calls', \
        'total_night_minutes', \
        'total_day_minutes', \
        'total_eve_minutes', \
        'account_length'],
    outputCol = 'features')

# Transform labels
from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer(inputCol = 'churned\'', outputCol = 'label')



StringIndexer_4e368afffebc0042573c

In [13]:
# Fit the model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression



classifier1 = LogisticRegression(labelCol = 'label', featuresCol = 'features')

pipeline1 = Pipeline(stages=[assembler1, label_indexer, classifier1]) 
# see https://spark.apache.org/docs/latest/ml-pipeline.html

model1 = pipeline1.fit(trainDF)




In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictionsTrain1 = model1.transform(trainDF) # get a prediction
predictionsTrain1.printSchema()
evaluatorTrain1 = BinaryClassificationEvaluator()
#Evaluator for binary classification, which expects two input columns: rawPrediction and label.
evaluatorTrain1.evaluate(predictionsTrain1)




root
 |-- 'state: string (nullable = true)
 |-- account_length: double (nullable = true)
 |-- area_code: double (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- international_plan: string (nullable = true)
 |-- voice_mail_plan: string (nullable = true)
 |-- number_vmail_messages: double (nullable = true)
 |-- total_day_minutes: double (nullable = true)
 |-- total_day_calls: double (nullable = true)
 |-- total_day_charge: double (nullable = true)
 |-- total_eve_minutes: double (nullable = true)
 |-- total_eve_calls: double (nullable = true)
 |-- total_eve_charge: double (nullable = true)
 |-- total_night_minutes: double (nullable = true)
 |-- total_night_calls: double (nullable = true)
 |-- total_night_charge: double (nullable = true)
 |-- total_intl_minutes: double (nullable = true)
 |-- total_intl_calls: double (nullable = true)
 |-- total_intl_charge: double (nullable = true)
 |-- number_customer_service_calls: double (nullable = true)
 |-- churned': string (nullabl

0.7420194416869594

In [20]:
predictionsTest1 = model1.transform(testDF)
evaluatorTest1 = BinaryClassificationEvaluator()
evaluatorTest1.evaluate(predictionsTest1)




0.7803118176855909

In [21]:
# let's redo the analysis eliminating one feature
assembler2 = VectorAssembler(
    inputCols = [
        'total_night_minutes', \
        'total_day_minutes', \
        'total_eve_minutes', \
        'account_length'],
    outputCol = 'features')

pipeline2 = Pipeline(stages=[assembler2, label_indexer, classifier1])

model2 = pipeline2.fit(trainDF)






In [22]:
predictionsTrain2 = model2.transform(trainDF)
evaluatorTrain2 = BinaryClassificationEvaluator()
evaluatorTrain2.evaluate(predictionsTrain2)




0.6430179160765926

In [23]:
predictionsTest2 = model2.transform(testDF)
evaluatorTest2 = BinaryClassificationEvaluator()
evaluatorTest2.evaluate(predictionsTest2)





0.6686254776200852