In [None]:
from pyspark import SparkContext
sc = SparkContext()

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("state", StringType(), True), \
    StructField("account_length", DoubleType(), True), \
    StructField("area_code", StringType(), True), \
    StructField("phone_number", StringType(), True), \
    StructField("intl_plan", StringType(), True), \
    StructField("voice_mail_plan", StringType(), True), \
    StructField("number_vmail_messages", DoubleType(), True), \
    StructField("total_day_minutes", DoubleType(), True), \
    StructField("total_day_calls", DoubleType(), True), \
    StructField("total_day_charge", DoubleType(), True), \
    StructField("total_eve_minutes", DoubleType(), True), \
    StructField("total_eve_calls", DoubleType(), True), \
    StructField("total_eve_charge", DoubleType(), True), \
    StructField("total_night_minutes", DoubleType(), True), \
    StructField("total_night_calls", DoubleType(), True), \
    StructField("total_night_charge", DoubleType(), True), \
    StructField("total_intl_minutes", DoubleType(), True), \
    StructField("total_intl_calls", DoubleType(), True), \
    StructField("total_intl_charge", DoubleType(), True), \
    StructField("number_customer_service_calls", DoubleType(), True), \
    StructField("churned", StringType(), True)])

churn_data = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('data/churn.csv', schema = schema)

In [None]:
count = churn_data.count()
voice_mail_plans = churn_data.filter(churn_data.voice_mail_plan == " yes").count()
print('Count: {0}, voice_mail_plans: {1}'.format(count,voice_mail_plans))

In [None]:
churn_data.describe().toPandas().transpose()

In [None]:
from pyspark.sql.functions import *

GroupBy Aggregation, Join, Column manipulations

In [None]:
states=churn_data.groupBy("state").count()
states_vm=churn_data.filter(churn_data.voice_mail_plan == " yes").groupBy("state") \
    .count().withColumnRenamed('count', 'count_vm')
joined_df = states.join(states_vm, states.state == states_vm.state, 'inner').drop(states_vm.state)
df=joined_df.select(joined_df['state'],(joined_df['count_vm']/joined_df['count']).alias('vmp'))
df.show()

Convert from Spark DataFrame to Pandas DataFrame. 
[df.sample(withReplacement, fraction, seed=None)]

In [None]:
sample_data = churn_data.sample(False, 0.5, 83).toPandas()
sample_data.head()

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sb
import pandas as pd

In [None]:
#pd.DataFrame.hist(data=sample_data,column='number_customer_service_calls')
sb.distplot(sample_data['number_customer_service_calls'], kde=False)

In [None]:
sb.boxplot(x="churned", y="number_customer_service_calls", data=sample_data)

Separating columns by type.

In [None]:
numeric_cols = ["account_length", "number_vmail_messages", "total_day_minutes",
                "total_day_calls", "total_day_charge", "total_eve_minutes",
                "total_eve_calls", "total_eve_charge", "total_night_minutes",
                "total_night_calls", "total_intl_minutes", "total_intl_calls",
                "total_intl_charge"]
reduced_numeric_cols = ["account_length", "number_vmail_messages", "total_day_calls",
                        "total_day_charge", "total_eve_calls", "total_eve_charge",
                        "total_night_calls", "total_intl_calls", "total_intl_charge"]
categorical_cols = ["state", "international_plan", "voice_mail_plan", "area_code"]

Preparing data for classifiers

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler

label_indexer = StringIndexer(inputCol = 'churned', outputCol = 'label')
plan_indexer = StringIndexer(inputCol = 'intl_plan', outputCol = 'intl_plan_indexed')

assembler = VectorAssembler(
    inputCols = ['intl_plan_indexed'] + reduced_numeric_cols,
    outputCol = 'features')

Decision Tree

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')

pipeline = Pipeline(stages=[plan_indexer, label_indexer, assembler, classifier])

(train, test) = churn_data.randomSplit([0.7, 0.3])
model = pipeline.fit(train)

Model Evaluation

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print('The AUROC is {0} and the AUPR is {1}'.format(auroc, aupr))

Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
classifier = RandomForestClassifier(labelCol = 'label', featuresCol = 'features', numTrees=500, seed=201)
pipeline = Pipeline(stages=[plan_indexer, label_indexer, assembler, classifier])
#(train, test) = churn_data.randomSplit([0.7, 0.3])
model = pipeline.fit(train)
predictions = model.transform(test)
evaluator = BinaryClassificationEvaluator()
auroc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})
aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
print('The AUROC is {0} and the AUPR is {1}'.format(auroc, aupr))

numTrees=20: The AUROC is 0.8034334855914316 and the AUPR is 0.6880744080071193