# Predicting Churn for one of the Telecom Company

In [1]:
import findspark
findspark.init()

import pyspark

from pyspark import SparkContext

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc=SparkContext('local','e1')  # if using locally
sql_sc=SQLContext(sc)

# reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

pandas_df=pd.read_csv('G:/Big_data_homework/churn.csv')  # assuming the file contains a header 
                                        #pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) -> if no header
# 
df=sql_sc.createDataFrame(pandas_df)

In [3]:
df.head(1)

[Row(state='KS', account length=128, area code=415, phone number='382-4657', international plan='no', voice mail plan='yes', number vmail messages=25, total day minutes=265.1, total day calls=110, total day charge=45.07, total eve minutes=197.4, total eve calls=99, total eve charge=16.78, total night minutes=244.7, total night calls=91, total night charge=11.01, total intl minutes=10.0, total intl calls=3, total intl charge=2.7, customer service calls=1, churn=False)]

In [33]:
df.printSchema()

root
 |-- state: string (nullable = true)
 |-- account length: long (nullable = true)
 |-- area code: long (nullable = true)
 |-- phone number: string (nullable = true)
 |-- international plan: string (nullable = true)
 |-- voice mail plan: string (nullable = true)
 |-- number vmail messages: long (nullable = true)
 |-- total day minutes: double (nullable = true)
 |-- total day calls: long (nullable = true)
 |-- total day charge: double (nullable = true)
 |-- total eve minutes: double (nullable = true)
 |-- total eve calls: long (nullable = true)
 |-- total eve charge: double (nullable = true)
 |-- total night minutes: double (nullable = true)
 |-- total night calls: long (nullable = true)
 |-- total night charge: double (nullable = true)
 |-- total intl minutes: double (nullable = true)
 |-- total intl calls: long (nullable = true)
 |-- total intl charge: double (nullable = true)
 |-- customer service calls: long (nullable = true)
 |-- churn: boolean (nullable = true)



In [28]:
from pyspark.sql.functions import col
d1=df.withColumn('churn',pyspark.sql.functions.when(df.churn =='False',0).when(df.churn =='True',1).otherwise(''))

In [48]:
from pyspark.sql.functions import col
from pyspark.sql.types import *

d1 = df.withColumn('churn',df['churn'].cast(IntegerType()))

In [49]:
d1.printSchema()

root
 |-- state: string (nullable = true)
 |-- account length: long (nullable = true)
 |-- area code: long (nullable = true)
 |-- phone number: string (nullable = true)
 |-- international plan: string (nullable = true)
 |-- voice mail plan: string (nullable = true)
 |-- number vmail messages: long (nullable = true)
 |-- total day minutes: double (nullable = true)
 |-- total day calls: long (nullable = true)
 |-- total day charge: double (nullable = true)
 |-- total eve minutes: double (nullable = true)
 |-- total eve calls: long (nullable = true)
 |-- total eve charge: double (nullable = true)
 |-- total night minutes: double (nullable = true)
 |-- total night calls: long (nullable = true)
 |-- total night charge: double (nullable = true)
 |-- total intl minutes: double (nullable = true)
 |-- total intl calls: long (nullable = true)
 |-- total intl charge: double (nullable = true)
 |-- customer service calls: long (nullable = true)
 |-- churn: integer (nullable = true)



In [50]:
features1=d1.select(['account length','international plan','voice mail plan','number vmail messages','total day minutes',
                    'total day calls','total day charge','total eve minutes','total eve calls','total eve charge',
                    'total night minutes','total night calls','total night charge','total intl minutes','total intl calls',
                    'total intl charge','customer service calls','churn'])

In [51]:
from pyspark.ml.feature import VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer

# check the below 3 lines
idx1=StringIndexer(inputCol='international plan',outputCol='i1').fit(features1).transform(features1)
idx2=StringIndexer(inputCol='voice mail plan',outputCol='i2').fit(idx1).transform(idx1)

# this is correct code but wrong input
encd1=OneHotEncoder(inputCol='i1',outputCol='int_plan')
d2=encd1.fit(idx2).transform(idx2)
encd2=OneHotEncoder(inputCol='i2',outputCol='vmail_plan')
d3=encd2.fit(d2).transform(d2)

In [52]:
d3.head(3)

[Row(account length=128, international plan='no', voice mail plan='yes', number vmail messages=25, total day minutes=265.1, total day calls=110, total day charge=45.07, total eve minutes=197.4, total eve calls=99, total eve charge=16.78, total night minutes=244.7, total night calls=91, total night charge=11.01, total intl minutes=10.0, total intl calls=3, total intl charge=2.7, customer service calls=1, churn=0, i1=0.0, i2=1.0, int_plan=SparseVector(1, {0: 1.0}), vmail_plan=SparseVector(1, {})),
 Row(account length=107, international plan='no', voice mail plan='yes', number vmail messages=26, total day minutes=161.6, total day calls=123, total day charge=27.47, total eve minutes=195.5, total eve calls=103, total eve charge=16.62, total night minutes=254.4, total night calls=103, total night charge=11.45, total intl minutes=13.7, total intl calls=3, total intl charge=3.7, customer service calls=1, churn=0, i1=0.0, i2=1.0, int_plan=SparseVector(1, {0: 1.0}), vmail_plan=SparseVector(1, {}

In [53]:
assembler=VectorAssembler(inputCols=['account length','number vmail messages','total day minutes',
                    'total day calls','total day charge','total eve minutes','total eve calls','total eve charge',
                    'total night minutes','total night calls','total night charge','total intl minutes','total intl calls',
                    'total intl charge','customer service calls','int_plan','vmail_plan'],outputCol='feature')

In [54]:
from pyspark.ml.classification import LogisticRegression

train_data,test_data =d3.randomSplit([0.7,.3])

model=LogisticRegression(featuresCol='feature',labelCol='churn')

In [55]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler,model])

In [56]:
fit_model = pipeline.fit(train_data)
results = fit_model.transform(test_data)

In [65]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='churn')
results.select('churn','prediction').show()

+-----+----------+
|churn|prediction|
+-----+----------+
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [66]:
AUC=my_eval.evaluate(results)
AUC

0.5919250645994832