# Description

This Spark program performs churn prediction using Random Forest prediction model.

## Import libraries

In [71]:
import pandas as pd
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from functools import reduce 

## Import data

In [39]:
data = sqlContext.read.load('/user/ebisa/Churn.csv', 
                          delimiter=';',
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
#data.printSchema()
#data.count()

In [67]:
# distribution of the churn
data.groupBy('Churn').count().toPandas().head()

Unnamed: 0,Churn,count
0,0,2850
1,1,483


## Transform categorical columns into labeled indices

In [57]:
si_label = StringIndexer(inputCol="Churn", outputCol="label", handleInvalid="skip")
si_intPlan = StringIndexer(inputCol="IntlPlan", outputCol="IP", handleInvalid="skip")
si_vmailPlan = StringIndexer(inputCol="VMailPlan", outputCol="VP", handleInvalid="skip")
si_State = StringIndexer(inputCol="State", outputCol="ST", handleInvalid="skip")
pipeline = Pipeline(stages=[si_label, si_intPlan, si_vmailPlan, si_State])

In [58]:
stringIndexedData = pipeline.fit(data).transform(data)

## Make a vector of columns of the features used in prediction

In [42]:
excludedFeatures = ['Churn', "IntlPlan", "VMailPlan", "State", "AreaCode", "Phone"]
features = [x for x in data.columns if x not in excludedFeatures]

In [43]:
vecAssembler = VectorAssembler(inputCols=features, outputCol="features")
featureAssembled = vecAssembler.transform(stringIndexedData)
#featureAssembled.first()

## Divide the data into training and test sets

In [91]:
(train, test) = featureAssembled.randomSplit([0.8, 0.2])
#train.count()
#test.count()

In [1]:
train.groupBy('Churn').count().toPandas().head()

In [2]:
test.groupBy('Churn').count().toPandas().head()

## Compensate for the data imbalance between churning and non-churnnig users

In [72]:
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [94]:
ratio = train.where(train.Churn == 0).count()/train.where(train.Churn == 1).count()
train_1 = train.where(train.Churn == 1)

In [95]:
# append train_1 onto train ratio-1 times to balance between churned and non-churned data
for i in range(0, ratio - 1):
    train = train.unionAll(train_1)

## Build the predictive model and perform a grid search of parameters

In [82]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=3, maxDepth=4, maxBins=200)

In [96]:
# perform a grid search on a set of parameter values

grid = ParamGridBuilder().addGrid(rf.numTrees, [2, 5, 10])\
                         .addGrid(rf.maxDepth, [2, 5, 10])\
                         .build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=rf, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(train)
evaluator.evaluate(cvModel.transform(test))

0.8748577929465309