# User Churn Prediction

#### Submit By: Yuhao Wang

Original Data Source:
https://www.sgi.com/tech/mlc/db/churn.all

#### Part 1
Load Data as Spark DataFrame

In [5]:
churn_data = sc.textFile("/FileStore/tables/tvcu6dws1495597580790/churn.all")
churn_data.collect()

In [6]:
from pyspark.sql.types import *
schema = StructType([ \
    StructField("state", StringType(), True), \
    StructField("account_length", DoubleType(), True), \
    StructField("area_code", StringType(), True), \
    StructField("phone_number", StringType(), True), \
    StructField("intl_plan", StringType(), True), \
    StructField("voice_mail_plan", StringType(), True), \
    StructField("number_vmail_messages", DoubleType(), True), \
    StructField("total_day_minutes", DoubleType(), True), \
    StructField("total_day_calls", DoubleType(), True), \
    StructField("total_day_charge", DoubleType(), True), \
    StructField("total_eve_minutes", DoubleType(), True), \
    StructField("total_eve_calls", DoubleType(), True), \
    StructField("total_eve_charge", DoubleType(), True), \
    StructField("total_night_minutes", DoubleType(), True), \
    StructField("total_night_calls", DoubleType(), True), \
    StructField("total_night_charge", DoubleType(), True), \
    StructField("total_intl_minutes", DoubleType(), True), \
    StructField("total_intl_calls", DoubleType(), True), \
    StructField("total_intl_charge", DoubleType(), True), \
    StructField("number_customer_service_calls", DoubleType(), True), \
    StructField("churned", StringType(), True)])

churn_data2 = sqlContext.read.format('com.databricks.spark.csv').load('/FileStore/tables/tvcu6dws1495597580790/churn.all', schema = schema,header="true")  

#### Part 2
Model training with Spark ML Pipeline

In [8]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

reduced_numeric_cols = ["account_length", "number_vmail_messages", "total_day_minutes",
                        "total_day_calls", "total_day_charge", "total_eve_minutes", 
                        "total_eve_calls", "total_eve_charge", "total_night_minutes",
                        "total_night_calls", "total_night_charge", "total_intl_minutes", 
                        "total_intl_calls", "total_intl_charge","number_customer_service_calls"]
                        
label_indexer = StringIndexer(inputCol = 'churned', outputCol = 'label')
plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')
voice_plan_indexer = StringIndexer(inputCol = 'voice_mail_plan', outputCol = 'voice_mail_plan_indexed')

assembler = VectorAssembler(inputCols = ['intl_plan_indexed','voice_mail_plan_indexed']
                            + reduced_numeric_cols, outputCol = 'features')
classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')

pipeline = Pipeline(stages=[plan_indexer, voice_plan_indexer, label_indexer, assembler, classifier])

(train, test) = churn_data2.randomSplit([0.8, 0.2])
model = pipeline.fit(train)
predictions = model.transform(test)

#### Part 3
Result Evaluation

In [10]:
from pyspark.mllib.evaluation import MulticlassMetrics

predictionAndLabels = predictions.select("label", "prediction").rdd#map(lambda lp: (lp.prediction, lp.label))

metrics = MulticlassMetrics(predictionAndLabels)

accuracy = metrics.accuracy
precision = metrics.precision(1.0)
recall = metrics.recall(1.0)
f1Score = metrics.fMeasure(1.0)

print("Summary Stats")
print("Accuracy = %s" % accuracy)
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
print "The AUROC is %s." % auroc