In [1]:
import findspark
findspark.init('/home/aditya/spark-3.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("Lreg").getOrCreate()

In [3]:
from pyspark.ml.classification import LogisticRegression

In [4]:
df = spark.read.csv('churn.csv', inferSchema=True, header=True)

In [5]:
df.printSchema()

root
 |-- State: string (nullable = true)
 |-- Account Length: integer (nullable = true)
 |-- Area Code: integer (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Int'l Plan: string (nullable = true)
 |-- VMail Plan: string (nullable = true)
 |-- VMail Message: integer (nullable = true)
 |-- Day Mins: double (nullable = true)
 |-- Day Calls: integer (nullable = true)
 |-- Day Charge: double (nullable = true)
 |-- Eve Mins: double (nullable = true)
 |-- Eve Calls: integer (nullable = true)
 |-- Eve Charge: double (nullable = true)
 |-- Night Mins: double (nullable = true)
 |-- Night Calls: integer (nullable = true)
 |-- Night Charge: double (nullable = true)
 |-- Intl Mins: double (nullable = true)
 |-- Intl Calls: integer (nullable = true)
 |-- Intl Charge: double (nullable = true)
 |-- CustServ Calls: integer (nullable = true)
 |-- Churn?: string (nullable = true)



In [13]:
from pyspark.sql.functions import countDistinct
for col in df.columns:
    df.select(col).describe().show()
    df.select(countDistinct(col)).show()

+-------+-----+
|summary|State|
+-------+-----+
|  count| 3333|
|   mean| null|
| stddev| null|
|    min|   AK|
|    max|   WY|
+-------+-----+

+---------------------+
|count(DISTINCT State)|
+---------------------+
|                   51|
+---------------------+

+-------+------------------+
|summary|    Account Length|
+-------+------------------+
|  count|              3333|
|   mean|101.06480648064806|
| stddev|39.822105928595676|
|    min|                 1|
|    max|               243|
+-------+------------------+

+------------------------------+
|count(DISTINCT Account Length)|
+------------------------------+
|                           212|
+------------------------------+

+-------+------------------+
|summary|         Area Code|
+-------+------------------+
|  count|              3333|
|   mean|437.18241824182417|
| stddev| 42.37129048560661|
|    min|               408|
|    max|               510|
+-------+------------------+

+-------------------------+
|count(DISTINCT 

In [15]:
mydf = df.select(['State', 'Account Length', 'Area Code', "Int'l Plan", 'VMail Plan',
 'VMail Message', 'Day Mins', 'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls',
 'Eve Charge', 'Night Mins', 'Night Calls', 'Night Charge', 'Intl Mins', 'Intl Calls',
 'Intl Charge', 'CustServ Calls', 'Churn?'])
myfinaldf = mydf.na.drop()

In [16]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

state_indexer = StringIndexer(inputCol='State',outputCol='StateIndexed')
state_encoder = OneHotEncoder(inputCol='StateIndexed',outputCol='stateVec')

area_indexer = StringIndexer(inputCol='Area Code',outputCol='AreaIndexed')
area_encoder = OneHotEncoder(inputCol='AreaIndexed',outputCol='areaVec')

int_plan_indexer = StringIndexer(inputCol="Int'l Plan",outputCol='IntplanIndexed')
# int_plan_encoder = OneHotEncoder(inputCol='IntplanIndexed',outputCol='IntPlanVec')

vmail_indexer = StringIndexer(inputCol="VMail Plan",outputCol='vmailIndexed')
# vmail_encoder = OneHotEncoder(inputCol='vmailIndexed',outputCol='vmailVec')

churn_indexer = StringIndexer(inputCol="Churn?",outputCol='ChurnIndexed')


assembler = VectorAssembler(inputCols=['stateVec', 'Account Length', 'areaVec', 
                                       "IntplanIndexed", 'vmailIndexed',
 'VMail Message', 'Day Mins', 'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls',
 'Eve Charge', 'Night Mins', 'Night Calls', 'Night Charge', 'Intl Mins', 'Intl Calls',
 'Intl Charge', 'CustServ Calls'], outputCol='features')

In [17]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

In [18]:
log_reg = LogisticRegression(featuresCol='features',labelCol='ChurnIndexed')

In [19]:
mlpipeline = Pipeline(stages=[state_indexer,state_encoder,area_indexer,area_encoder,
                              int_plan_indexer,vmail_indexer,
                             churn_indexer,assembler, log_reg])

In [20]:
##split data into train and test
train_data, test_data = myfinaldf.randomSplit([0.7,0.3])

In [21]:
mymodel = mlpipeline.fit(train_data)

In [22]:
results = mymodel.transform(test_data)

In [23]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [28]:
myeval = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",
                                      labelCol='ChurnIndexed')

In [29]:
myeval.evaluate(results)

0.7866068149357215