# Load the data

We need to load data from a file in to a Spark DataFrame.
Each row is an observed customer, and each column contains
attributes of that customer.

    Fields:
    state: discrete.
    account length: numeric.
    area code: numeric.
    phone number: discrete.
    international plan: discrete.
    voice mail plan: discrete.
    number vmail messages: numeric.
    total day minutes: numeric.
    total day calls: numeric.
    total day charge: numeric.
    total eve minutes: numeric.
    total eve calls: numeric.
    total eve charge: numeric.
    total night minutes: numeric.
    total night calls: numeric.
    total night charge: numeric.
    total intl minutes: numeric.
    total intl calls: numeric.
    total intl charge: numeric.
    number customer service calls: numeric.
    churned: discrete.

'Numeric' and 'discrete' do not adequately describe the fundamental differencecs in the attributes.

In [2]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("state", StringType(), True), \
    StructField("account_length", DoubleType(), True), \
    StructField("area_code", StringType(), True), \
    StructField("phone_number", StringType(), True), \
    StructField("intl_plan", StringType(), True), \
    StructField("voice_mail_plan", StringType(), True), \
    StructField("number_vmail_messages", DoubleType(), True), \
    StructField("total_day_minutes", DoubleType(), True), \
    StructField("total_day_calls", DoubleType(), True), \
    StructField("total_day_charge", DoubleType(), True), \
    StructField("total_eve_minutes", DoubleType(), True), \
    StructField("total_eve_calls", DoubleType(), True), \
    StructField("total_eve_charge", DoubleType(), True), \
    StructField("total_night_minutes", DoubleType(), True), \
    StructField("total_night_calls", DoubleType(), True), \
    StructField("total_night_charge", DoubleType(), True), \
    StructField("total_intl_minutes", DoubleType(), True), \
    StructField("total_intl_calls", DoubleType(), True), \
    StructField("total_intl_charge", DoubleType(), True), \
    StructField("number_customer_service_calls", DoubleType(), True), \
    StructField("churned", StringType(), True)])

churn_df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load("/FileStore/tables/fyarcesz1457233855557/churn.all", schema = schema)

In [3]:
count = churn_df.count()
voice_mail_plans = churn_data.filter(churn_data.voice_mail_plan == " yes").count()

"%d, %d" % (count, voice_mail_plans)

Intuitively we would find out customers who made one or more customer service calls & How many have more than one?

In [5]:
churn_result = churn_df["churned"]

In [6]:
multiple_service_calls = churn_df.filter(churn_df.number_customer_service_calls > 0).count()

In [7]:
multiple_service_calls

In [8]:
sample_data = churn_df.sample(False, 0.5, 83).toPandas()
sample_data.head()

# Feature Extraction and Model Training

* Code features that are not already numeric
* Gather all features we need into a single column in the DataFrame.
* Split labeled data into training and testing set
* Fit the model to the training data.

## Feature Extraction
We need to define our input features.

In [10]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

label_indexer = StringIndexer(inputCol = 'churned', outputCol = 'label')
plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')

assembler = VectorAssembler(
    inputCols = ['intl_plan_indexed'] + reduced_numeric_cols,
    outputCol = 'features')

## Model Training

We can now define our classifier and pipeline. With this done, we can split our labeled data in train and test sets and fit a model.

#DecisionTreeClassifier

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')

pipeline = Pipeline(stages=[plan_indexer, label_indexer, assembler, classifier])

(train, test) = churn_data.randomSplit([0.7, 0.3])
model = pipeline.fit(train)

## Model Evaluation

In [15]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
"The AUROC is %s and the AUPR is %s." % (auroc, aupr)

#RandomForestClassifier

In [17]:
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(labelCol= 'label', featuresCol= 'features', maxDepth= 10, numTrees= 100, impurity = 'entropy')

pipeline = Pipeline(stages=[plan_indexer, label_indexer, assembler, classifier])

(train, test) = churn_data.randomSplit([0.7, 0.3])

model = pipeline.fit(train)

## Model Evaluation

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
"The AUROC is %s and the AUPR is %s." % (auroc, aupr)