# Load the data

In [None]:
"""
Data from: https://www.sgi.com/tech/mlc/db/churn.all

Fields:

state: discrete.
account length: continuous.
area code: continuous.
phone number: discrete.
international plan: discrete.
voice mail plan: discrete.
number vmail messages: continuous.
total day minutes: continuous.
total day calls: continuous.
total day charge: continuous.
total eve minutes: continuous.
total eve calls: continuous.
total eve charge: continuous.
total night minutes: continuous.
total night calls: continuous.
total night charge: continuous.
total intl minutes: continuous.
total intl calls: continuous.
total intl charge: continuous.
number customer service calls: continuous.
"""

from pyspark.sql import SQLContext
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
schema = StructType([ \
    StructField("state", StringType(), True), \
    StructField("account_length", DoubleType(), True), \
    StructField("area_code", StringType(), True), \
    StructField("phone_number", StringType(), True), \
    StructField("international_plan", StringType(), True), \
    StructField("voice_mail_plan", StringType(), True), \
    StructField("number_vmail_messages", DoubleType(), True), \
    StructField("total_day_minutes", DoubleType(), True), \
    StructField("total_day_calls", DoubleType(), True), \
    StructField("total_day_charge", DoubleType(), True), \
    StructField("total_eve_minutes", DoubleType(), True), \
    StructField("total_eve_calls", DoubleType(), True), \
    StructField("total_eve_charge", DoubleType(), True), \
    StructField("total_night_minutes", DoubleType(), True), \
    StructField("total_night_calls", DoubleType(), True), \
    StructField("total_night_charge", DoubleType(), True), \
    StructField("total_intl_minutes", DoubleType(), True), \
    StructField("total_intl_calls", DoubleType(), True), \
    StructField("total_intl_charge", DoubleType(), True), \
    StructField("number_customer_service_calls", DoubleType(), True), \
    StructField("churned", StringType(), True)])

df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .load('churn.all', schema = schema)
    
# TODO: remove extraneous spaces from file

In [None]:
df.take(5)

# Assemble feature vectors

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols = [
        'number_customer_service_calls', \
        'total_night_minutes', \
        'total_day_minutes', \
        'total_eve_minutes', \
        'account_length'],
    outputCol = 'features')

feature_df = assembler.transform(df)

# Transform labels

In [None]:
from pyspark.sql.functions import udf

label_func = udf(lambda c: 1.0 if c == " True." else 0.0, DoubleType())
train_set = feature_df.select('features', label_func(df.churned).alias('label'))

In [None]:
train_set.take(5)

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10, regParam=0.01)

train_set.cache()
model = lr.fit(train_set)

In [None]:
print(model.weights)
print(model.intercept)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(train_set)
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)