 # Loading and Cleaning

First we need to load the data as a CSV. Note I also showed a row for simplicity to understand what we're looking at.

In [3]:
from pyspark.sql.functions import *

data = sqlContext.read.format('com.databricks.spark.csv') \
.option("header",True) \
.option("inferSchema",True) \
.load('file:/home/cloudera/Big Data Project/final_with_churn.csv')

In [4]:
data.limit(1).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn
0,++0wqjjQge1mBBe5r4ciHGKwtF/m322zkra7CK8I+Mw=,0,16,\N,\N,\N,\N,\N,\N,\N,30.0,99.0,99.0,20170306,20151107,\N,\N,\N,\N,0


Then we want to check the schema.

In [5]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- days_used: integer (nullable = true)
 |-- total_transactions: integer (nullable = true)
 |-- total_num_25: string (nullable = true)
 |-- total_num_50: string (nullable = true)
 |-- total_num_75: string (nullable = true)
 |-- total_num_985: string (nullable = true)
 |-- total_num_100: string (nullable = true)
 |-- avg_unique_songs: string (nullable = true)
 |-- avg_total_secs: string (nullable = true)
 |-- avg_plan_length: double (nullable = true)
 |-- avg_expected_plan_price: double (nullable = true)
 |-- avg_actual_plan_price: double (nullable = true)
 |-- max_expiration_date: integer (nullable = true)
 |-- min_transaction_date: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- is_churn: integer (nullable = true)



Notice that sum of the numbers came in as strings due to NA values. We're going to have to clean next so there are 2 steps to this:

1. Filling \N's in the appropriate columsns with 0. We will also want to convert the strings in these fields to floats.
    - these columns are
2. Dropping rows that aren't relevant because they have missing data.
    - these columns are: city, age, gender, registered via


Let's also get a row count so we have some idea of proportionality moving forward.

In [6]:
data.select('user_id').count()

992931

Before starting 1, lets check how many are missing from these columns. Note that I assume if one column is missing they are all misssing as they all were pulled from the same table.

In [7]:
data.select('total_num_25').groupby('total_num_25').count().sort('count', ascending = False).limit(5).show()

+------------+------+
|total_num_25| count|
+------------+------+
|          \N|123005|
|           0|  2684|
|          36|   845|
|          72|   776|
|          60|   751|
+------------+------+



So we have 123k missing the values pulled from the usage. That being said, since these users were present in other tables I'm going to make the assumption they were registered but for whatever reason they did not use their account actively. In this case, we will leave them in, but fill these usage related values as 0's in our next step.

In [8]:
#1
data = data.withColumn('total_num_25', \
                      when(data['total_num_25']=='\N', '0.0').otherwise(data['total_num_25']))

data = data.withColumn('total_num_50', \
                      when(data['total_num_50']=='\N', '0.0').otherwise(data['total_num_50']))

data = data.withColumn('total_num_75', \
                      when(data['total_num_75']=='\N', '0.0').otherwise(data['total_num_75']))

data = data.withColumn('total_num_985', \
                      when(data['total_num_985']=='\N', '0.0').otherwise(data['total_num_985']))

data = data.withColumn('total_num_100', \
                      when(data['total_num_100']=='\N', '0.0').otherwise(data['total_num_100']))

data = data.withColumn('avg_unique_songs', \
                      when(data['avg_unique_songs']=='\N', '0.0').otherwise(data['avg_unique_songs']))

data = data.withColumn('avg_total_secs', \
                      when(data['avg_total_secs']=='\N', '0.0').otherwise(data['avg_total_secs']))

Checking the first row to make sure the above corrections worked.

In [9]:
data.limit(1).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn
0,++0wqjjQge1mBBe5r4ciHGKwtF/m322zkra7CK8I+Mw=,0,16,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.0,99.0,99.0,20170306,20151107,\N,\N,\N,\N,0


Next we will try to drop rows missing some values... (city, age, gender, registered via) as these are important attributes we want in our analysis. First lets see how many rows are missing these values; note taht i assumed if its missing for city its missing for gender/age/registered via as well since they all pulled from the same table.

In [10]:
data.select('age').groupby('age').count().sort('count', ascending = False).limit(5).show()

+---+------+
|age| count|
+---+------+
|  0|433567|
| \N|296297|
| 27| 15344|
| 26| 14611|
| 25| 13778|
+---+------+



296,297 is a little more than a quarter of rows misssing these values; its a substantial number but since these attributes were from their customer table, I find it a bit odd that the customer table is missing this data while the payment table still has it. From earlier analysis, I could also see a lot of these rows were also missing usage data. Since we are testing the capabilities of pyspark, I am going to make the executive decision to drop these rows; we will still have a viable model built off a large quantity of data without them and will be able to drive better insights with the data not being so present.

In [11]:
data = data.filter(data.age != '\N')

Checking first row to make sure it worked... 

In [12]:
data.limit(1).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn
0,++Bks8kE9oclzxZM3hcWs+qzsxuoXFeIE1+7pxKBCQg=,624,624,2408,312,264,360,18864,20.76923,6836.1167,30.0,99.0,99.0,20170331,20160730,1,0,,7,0


Checking counts again to make sure all NA are gone...

Note that some are still null for female/male but since they were included in the rest the table we will leave that in. Perhaps the empty value for this field is because including gender is optional, and maybe our model will take that input as some type of value (People who don't put down there gender are more likely to churn???).

In [13]:
data = data.withColumn('gender',when(
data['gender']=='', 'Not_Provided').otherwise(data['gender']))

data.select('gender').groupby('gender').count().sort('count', ascending = False).limit(5).show()

+------------+------+
|      gender| count|
+------------+------+
|Not_Provided|430471|
|        male|141045|
|      female|125118|
+------------+------+



Final thing we have to do is find the date difference between the max_expiration_date and the min_transaction_date.

First we need to convert both to dates.

In [14]:
data = data.withColumn('max_expiration_date',data.max_expiration_date.cast("string"))
data = data.withColumn('min_transaction_date',data.min_transaction_date.cast("string"))

data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- days_used: integer (nullable = true)
 |-- total_transactions: integer (nullable = true)
 |-- total_num_25: string (nullable = true)
 |-- total_num_50: string (nullable = true)
 |-- total_num_75: string (nullable = true)
 |-- total_num_985: string (nullable = true)
 |-- total_num_100: string (nullable = true)
 |-- avg_unique_songs: string (nullable = true)
 |-- avg_total_secs: string (nullable = true)
 |-- avg_plan_length: double (nullable = true)
 |-- avg_expected_plan_price: double (nullable = true)
 |-- avg_actual_plan_price: double (nullable = true)
 |-- max_expiration_date: string (nullable = true)
 |-- min_transaction_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- is_churn: integer (nullable = true)



In [15]:
data.registerTempTable('data')

data.select('max_expiration_date','min_transaction_date').limit(5).show()

+-------------------+--------------------+
|max_expiration_date|min_transaction_date|
+-------------------+--------------------+
|           20170331|            20160730|
|           20170311|            20150914|
|           20170316|            20150131|
|           20170331|            20150121|
|           20170305|            20150410|
+-------------------+--------------------+



In [16]:
#building the correct date vectors
max_exp_date = sqlContext.sql("""
    SELECT TO_DATE(CAST(UNIX_TIMESTAMP(max_expiration_date,'yyyyMMdd') AS TIMESTAMP)) AS max_exp_date
    FROM DATA
""")

min_trans_date = sqlContext.sql("""
    SELECT TO_DATE(CAST(UNIX_TIMESTAMP(min_transaction_date,'yyyyMMdd') AS TIMESTAMP)) AS min_trans_date
    FROM DATA
""")

In [17]:
max_exp_date.limit(5).show()

+------------+
|max_exp_date|
+------------+
|  2017-03-31|
|  2017-03-11|
|  2017-03-16|
|  2017-03-31|
|  2017-03-05|
+------------+



In [18]:
min_trans_date.limit(5).show()

+--------------+
|min_trans_date|
+--------------+
|    2016-07-30|
|    2015-09-14|
|    2015-01-31|
|    2015-01-21|
|    2015-04-10|
+--------------+



In [19]:
#merging these into the data dataFrame

data = data.withColumn('row_index', monotonically_increasing_id())
max_exp_date = max_exp_date.withColumn('row_index', monotonically_increasing_id())
min_trans_date = min_trans_date.withColumn('row_index', monotonically_increasing_id())

data = data.join(max_exp_date, on=['row_index'])
data = data.join(min_trans_date, on=['row_index']).drop('row_index')

In [20]:
#make date diff column

#for some reason tihs is all 0s.... fix somehow. the min trans date is coming in wrong
data = data.withColumn('date_diff', datediff(data.max_exp_date,data.min_trans_date))

In [21]:
data.limit(5).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,...,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn,max_exp_date,min_trans_date,date_diff
0,+1ZgRw2ZlmD4Z1NCVo8lh4ECpNtG73bp/cECvhq4l8Q=,1071,1071,2961,1050,735,651,16212,16.745098,4050.471,...,20170311,20150802,1,0,Not_Provided,7,0,2017-03-11,2015-08-02,587
1,+NKVPkGwpoOWKQDdH3mtpaZGR5lx9fu5bOHixIRjsnI=,2610,2610,17385,3495,3270,2580,75045,18.718391,8045.0156,...,20170315,20151216,1,0,Not_Provided,7,0,2017-03-15,2015-12-16,455
2,+i2T+lq7TNR/gThVOEh6M3CdHEIgZhgeH1ENTjAgMyE=,6403,6403,39349,10260,6175,5966,79591,19.973293,3643.7224,...,20170319,20150920,1,0,Not_Provided,7,0,2017-03-19,2015-09-20,546
3,/67f8zgh70yyzqwntxaAuqqSrbibNC7KxG5rGBg4/hc=,130,130,390,55,80,65,1970,12.230769,4521.8457,...,20170331,20161029,1,0,Not_Provided,7,0,2017-03-31,2016-10-29,153
4,/UYp4Ued/yMVEf5OD13C9Hz8B/N78PBx13tglE3+gXA=,117,117,213,21,9,15,2409,19.564102,5146.8057,...,20170331,20161231,1,0,Not_Provided,7,0,2017-03-31,2016-12-31,90


# Model Prep and Pipeline Building

Next up we need to get our data in a format that can be fed to models. First, we will select the data and also cast it to its appropriate data types now that its been cleaned up.

In [22]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- days_used: integer (nullable = true)
 |-- total_transactions: integer (nullable = true)
 |-- total_num_25: string (nullable = true)
 |-- total_num_50: string (nullable = true)
 |-- total_num_75: string (nullable = true)
 |-- total_num_985: string (nullable = true)
 |-- total_num_100: string (nullable = true)
 |-- avg_unique_songs: string (nullable = true)
 |-- avg_total_secs: string (nullable = true)
 |-- avg_plan_length: double (nullable = true)
 |-- avg_expected_plan_price: double (nullable = true)
 |-- avg_actual_plan_price: double (nullable = true)
 |-- max_expiration_date: string (nullable = true)
 |-- min_transaction_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- max_exp_date: date (nullable = true)
 |-- min_trans_date: date (nullable = true)
 |-- da

In [23]:
cols = data.select(data.days_used.cast('double'),
                   data.total_transactions.cast('double'),
                   data.total_num_25.cast('double'),
                   data.total_num_50.cast('double'),
                   data.total_num_75.cast('double'),
                   data.total_num_985.cast('double'),
                   data.total_num_100.cast('double'),
                   data.avg_unique_songs.cast('double'),
                   data.avg_total_secs.cast('double'),
                   data.avg_plan_length.cast('double'),
                   data.avg_expected_plan_price.cast('double'),
                   data.avg_actual_plan_price.cast('double'),
                   data.date_diff.cast('double'),
                   data.age.cast('double'),
                   data.city,
                   data.gender,
                   data.registered_via,
                   data.is_churn.cast('double')               
                  )

cols.limit(1).toPandas()

Unnamed: 0,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,date_diff,age,city,gender,registered_via,is_churn
0,1071.0,1071.0,2961.0,1050.0,735.0,651.0,16212.0,16.745098,4050.471,30.0,99.04762,99.04762,587.0,0.0,1,Not_Provided,7,0.0


Indexing categorical variables, then encoding them

In [24]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)

#City, gender, registered via
#index
city_indexer = StringIndexer(inputCol='city', outputCol='cityIndex')
gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex')
registered_via_indexer = StringIndexer(inputCol='registered_via', outputCol='registered_viaIndex')

#encoders
city_encoder = OneHotEncoder(inputCol='cityIndex', outputCol='cityVec')
gender_encoder = OneHotEncoder(inputCol='genderIndex', outputCol='genderVec')
registered_via_encoder = OneHotEncoder(inputCol='registered_viaIndex', outputCol='registered_viaVec')


Building Assembler

In [25]:
assembler = VectorAssembler(inputCols=['days_used','total_transactions','total_num_25','total_num_50','total_num_75',
                                      'total_num_985','total_num_100','avg_unique_songs','avg_total_secs','avg_plan_length',
                                      'avg_expected_plan_price','avg_actual_plan_price','date_diff','age','cityVec','genderVec',
                                      'registered_viaVec'], outputCol='features')

Importing and setting up logistic regression

In [26]:
from pyspark.ml.classification import LogisticRegression

log_reg = LogisticRegression(labelCol='is_churn')

Pipeline Time!

In [27]:
from pyspark.ml import Pipeline

pipe = Pipeline(stages=[city_indexer,
                       gender_indexer,
                       registered_via_indexer,
                       city_encoder,
                       gender_encoder,
                       registered_via_encoder,
                       assembler,
                       log_reg])

Splitting the data to a 70 / 30 - test / train

In [28]:
train_data, test_data = cols.randomSplit([.7,.3])

Fitting the Model

In [29]:
fit = pipe.fit(train_data)

In [30]:
results = fit.transform(test_data)



# Evaluating the LR

In [31]:
log_reg_model = fit.stages[-1]

In [32]:
print log_reg_model.coefficients

[-2.76861738294e-06,-2.76906642165e-06,2.76129596462e-08,1.10384579839e-07,-1.41549541285e-07,-1.76662626343e-07,-3.33158865124e-08,0.00119160927337,3.12806197438e-17,0.00183171592377,0.000966000200896,0.000856394610835,-0.000108746679117,0.00070305104041,-0.073810076421,0.0325104039908,0.0429620802755,0.041198160646,0.060604062683,0.0571704979203,0.0841774793329,0.0115678181962,0.063762188967,0.0369474132174,0.0575373418642,0.0211504365713,0.106722215846,0.0572012113462,0.0205644179312,0.0748219495581,0.0547873462334,0.011295756786,0.0172637063013,-0.0294404827105,-0.0802804552893,0.063089695364,-0.191192325923,0.0736578550748,0.215443133875,0.325510511337]


In [37]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

data_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='is_churn')

auc = data_eval.evaluate(results)

print auc

0.835591994753


In [47]:
#build confusion matrix
predicted = results.map(lambda b: b[3])

a.take(1)

[0.0]

AttributeError: 'PipelineModel' object has no attribute 'predict'

# Building Decision Tree

In [104]:
from pyspark.ml.classification import DecisionTreeClassifier

dec_tree = DecisionTreeClassifier(labelCol='is_churn')

In [105]:
dc_pipe = Pipeline(stages=[city_indexer,
                       gender_indexer,
                       registered_via_indexer,
                       city_encoder,
                       gender_encoder,
                       registered_via_encoder,
                       assembler,
                       dec_tree])

In [106]:
dc_fit = dc_pipe.fit(train_data)

IllegalArgumentException: u'DecisionTreeClassifier was given input with invalid label column is_churn, without the number of classes specified. See StringIndexer.'

In [None]:
dc_results = dc_fit.transform(test_data)

In [None]:
dc_data_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='is_churn')

dc_auc = dc_data_eval.evaluate(dc_results)

print dc_auc