# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
#only for my local spark setup on windows
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = 'pyspark-shell'

#import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit
import datetime
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
%matplotlib inline
from pyspark.sql.types import IntegerType, DateType, StructType,StructField, StringType, FloatType
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import re
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import random
import datetime
from pyspark.sql import DataFrame
from ua_parser import user_agent_parser
import pickle

In [2]:
# install ua-parser for url parsing. Credit: https://github.com/ua-parser/uap-python
! pip install ua-parser



You should consider upgrading via the 'c:\users\aditya\appdata\local\programs\python\python38-32\python.exe -m pip install --upgrade pip' command.


In [3]:
# create a Spark session
spark = SparkSession.builder \
    .master("local[8]") \
    .appName("Sparkify Project 1") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [4]:
#load mini dataset
df = spark.read.json('mini_sparkify_event_data.json')

In [5]:
# look at all the columns and datatypes
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [6]:
#no. of records
df.count()

286500

It seems these are records of every interaction a user has had with the app. There are 286,500 records

In [7]:
df.take(1)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')]

In [12]:
df.select("status").dropDuplicates().show()

+------+
|status|
+------+
|   307|
|   404|
|   200|
+------+



In [16]:
df.select("page").dropDuplicates().show(30, truncate = False)

+-------------------------+
|page                     |
+-------------------------+
|Cancel                   |
|Submit Downgrade         |
|Thumbs Down              |
|Home                     |
|Downgrade                |
|Roll Advert              |
|Logout                   |
|Save Settings            |
|Cancellation Confirmation|
|About                    |
|Submit Registration      |
|Settings                 |
|Login                    |
|Register                 |
|Add to Playlist          |
|Add Friend               |
|NextSong                 |
|Thumbs Up                |
|Help                     |
|Upgrade                  |
|Error                    |
|Submit Upgrade           |
+-------------------------+



In [33]:
df.select("userId").dropDuplicates().count()

226

My assumptions for the content :  

1.) Artist, Song : The artist & song being listened to. Useful for genre classifications, but may not be as usueful as a search event for an artist would be.  

2.) Firstname, Lastname, Location, gender, userId : The specifics. Only the userId, gender and location are likely relevant to us.  

3.) Auth : This seems directly connected to the userId column, as in you need to be logged in for that to populate. Other items like firstname etc wont be populated unless you are logged in, so we focus on this  

4.) Method, status : Status is an unknown variable, with just three numbers which are unclear - likely an event code? Method is the HTTP method, which is not relevant and will just have PUT and GET

5.) sessionId : Seems to be generated per session and not unique to user.

6.) Registration and TS: TS is simply the timestamp of the event. Registration is likely when they registered. 

7.) Level : A critical value, free vs paid

8.) UserAgent : The broser/device where they accessed the service from.

9.) Page : The actual event. Rather interesting, see above.

In [17]:
#Check for Nulls - check in userid's and session id's as they are identifiers. 
df.createOrReplaceTempView("spark_table")

spark.sql('''
            SELECT COUNT(userId), COUNT(sessionId) 
            FROM spark_table 
            WHERE userId IS NULL OR sessionId IS NULL
            '''
            ).show()



+-------------+----------------+
|count(userId)|count(sessionId)|
+-------------+----------------+
|            0|               0|
+-------------+----------------+



In [18]:
#No nulls. lets check blank user id's

spark.sql('''
        SELECT COUNT(userId)
        FROM spark_table 
        WHERE userId =''
        '''
        ).show()


+-------------+
|count(userId)|
+-------------+
|         8346|
+-------------+



In [19]:
# lets check blank sessionid's

spark.sql('''
        SELECT COUNT(sessionId)
        FROM spark_table 
        WHERE sessionId =''
        '''
        ).show()

+----------------+
|count(sessionId)|
+----------------+
|               0|
+----------------+



No blank sessionid's. Makes sense, as any activity would be part of a session

In [20]:
#taking a look at blank user id's

spark.sql('''
        SELECT userId, page, sessionId
        FROM spark_table
        WHERE userId =''
        '''
        ).show()

+------+-----+---------+
|userId| page|sessionId|
+------+-----+---------+
|      | Home|        8|
|      | Help|        8|
|      | Home|        8|
|      |Login|        8|
|      | Home|      240|
|      |Login|      240|
|      |Login|      100|
|      |Login|      241|
|      | Home|      187|
|      |Login|      187|
|      | Home|      187|
|      | Home|      187|
|      |Login|      187|
|      | Home|       27|
|      |About|       27|
|      | Home|       27|
|      | Home|      187|
|      |Login|      187|
|      | Home|      257|
|      | Home|      100|
+------+-----+---------+
only showing top 20 rows



It seems the blank id's are associated with non-logged in users, which makes sense. We should drop these, as there is no concept of churn when it comes to unlogged users - we cannot differentiate them. However, they might be useful if we wanted to, say, test some changes which lead to more users signing up.   

In [21]:
#removing blank user id's
df_new = df.filter(df['userId']!='')

In [22]:
#fix the timestamp
fix_time = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
df_new = df_new.withColumn("time", fix_time(df_new.ts))

#split the location into city and state
city_name = udf(lambda x:x.split(", ")[0])
state_name = udf(lambda x:x.split(", ")[1])
df_new = df_new.withColumn("city", city_name(df_new.location))
df_new = df_new.withColumn("state", state_name(df_new.location))

#split the useragent into browser and OS
browser_name = udf(lambda x:user_agent_parser.ParseUserAgent(x)["family"])
os_name = udf(lambda x:user_agent_parser.ParseOS(x)["family"])
df_new = df_new.withColumn("browser", browser_name(df_new.userAgent))
df_new = df_new.withColumn("OS", os_name(df_new.userAgent))

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

---

Let us explore some churn events. While we will add the downgrade indicator here, I will not explore it further, focusing on the Cancellation churn. Note the "Cancel" page column - this is very strongly correlated with the cancellation. This likely means very few back out after reaching this stage.

In [23]:
# add flags for the two churn types, cancellations & downgrades. 
cancel_churn = udf(lambda x: int(x=="Cancellation Confirmation"), IntegerType())
down_churn = udf(lambda x: int(x=="Submit Downgrade"), IntegerType())

df_new = df_new.withColumn("Cancelled", cancel_churn("page"))
df_new = df_new.withColumn("Downgrade", down_churn("page"))

In [24]:
df_new.createOrReplaceTempView("spark_table")

In [25]:
#explore a bit : Distribution by states
state = spark.sql('''
        SELECT state, COUNT(DISTINCT userId) AS cnt
        FROM spark_table
        GROUP BY State
        ORDER BY cnt DESC
        '''
        ).show()

+-----------+---+
|      state|cnt|
+-----------+---+
|         CA| 33|
|         TX| 16|
|   NY-NJ-PA| 15|
|         FL| 14|
|         AZ|  7|
|         CT|  7|
|      MO-IL|  6|
|   IL-IN-WI|  6|
|         NC|  6|
|      NC-SC|  6|
|      MA-NH|  5|
|         NY|  5|
|PA-NJ-DE-MD|  5|
|         MI|  5|
|         AL|  4|
|DC-VA-MD-WV|  4|
|         WA|  4|
|         GA|  4|
|         CO|  4|
|      MN-WI|  3|
+-----------+---+
only showing top 20 rows



Most users are from California, evidently

In [26]:
# by Gender

spark.sql('''
        SELECT Cancelled, gender, COUNT(DISTINCT userId) AS cnt
        FROM spark_table
        GROUP BY Cancelled,gender 
        Order by Cancelled
        '''
        ).show()

+---------+------+---+
|Cancelled|gender|cnt|
+---------+------+---+
|        0|     F|104|
|        0|     M|121|
|        1|     F| 20|
|        1|     M| 32|
+---------+------+---+



A higher number of male users, but the cancellations dont seem to be influenced by it, as the ratios are not too far off. Too few users to generalize here

In [27]:
#by browser
spark.sql('''
        SELECT Cancelled, Browser, COUNT(DISTINCT userId) AS cnt
        FROM spark_table
        GROUP BY Cancelled, Browser
        ORDER BY Cancelled
        '''
        ).show()

+---------+-------------+---+
|Cancelled|      Browser|cnt|
+---------+-------------+---+
|        0|           IE| 12|
|        0|Mobile Safari| 16|
|        0|       Safari| 30|
|        0|      Firefox| 50|
|        0|       Chrome|117|
|        1|           IE|  1|
|        1|Mobile Safari|  4|
|        1|       Safari|  6|
|        1|      Firefox| 16|
|        1|       Chrome| 25|
+---------+-------------+---+



In [28]:
#by platform
spark.sql('''
        SELECT OS, COUNT(DISTINCT userId) AS cnt
        FROM spark_table
        GROUP BY OS
        ORDER BY cnt DESC
        '''
        ).show()

+--------+---+
|      OS|cnt|
+--------+---+
| Windows|111|
|Mac OS X| 86|
|     iOS| 16|
|   Linux|  7|
|  Ubuntu|  5|
+--------+---+



# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [114]:
df_new.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- time: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- browser: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- Downgrade: integer (nullable = true)



### Categorical features :

1.) Gender - A demographic variable.     
2.) Level - Free vs paid - An essential variable
3.) Browser - May indicate user demographics and technical acumen. Can also be used for mobile vs non mobile usage split
4.) OS - Shows the technical acumen of the user, and can help target optimizations by platform
5.) Cancelled : Predicting churn    

For the future :  
1.) State - A very sparse vector, may take the top 5 and put the rest to 0, as a demographic check (wealth etc) 
  
### Numeric Features :  

Time window based :  

1.) Page events - Add friend, Add to Playlist etc. Skipping About, Settings, Save settings, Logout and the churn classifiers  - Different measures of activity

2.) No. of songs, No. of artists, No. of sessions, listening time - To understand usage  

Non time window based :  

3.) How long the person has been a user for.  


In [29]:
#drop unneeded columns

df_selected = df_new.select("artist", "gender", "length", "level", "page", "registration", "sessionId", "song", "ts", "Time", "userId", "Browser", "OS", "Cancelled")

In [30]:
# Fix Time column - cast as date 
df_selected = df_selected.withColumn("Date", df_selected["Time"].cast(DateType()))

We have an issue here - spark dataframes dont have an index. I can use ts (timestamp) as a proxy for an index, but two different users can have the same ts, which would make it non-unique. Hence, I will order by date and make an index column.

This becomes important in my second approach at the problem (Approach 2: Event based)

In [31]:
# add an index
w = Window.orderBy("Date")
df_selected = df_selected.withColumn("index", row_number().over(w))


In [32]:
df_selected.cache()

DataFrame[artist: string, gender: string, length: double, level: string, page: string, registration: bigint, sessionId: bigint, song: string, ts: bigint, Time: string, userId: string, Browser: string, OS: string, Cancelled: int, Date: date, index: int]

### Twin Approaches to the problem :

When working on this, I realized that just 226 users are a bit too few for most machine learning models. Therefore, I decided on a dual approach :

1.) Event Based : Use the churn event itself and model what led the user to it, vs other, non churn events. This of course, risks taking the events right preceding the churn events as a non-churn event. Likely will optimize this more in the future

2.) User based : Check the churned user and see his pattern of usage vs the non-churned user. I will take the whole activity window for him, as the cancellation of the account (churn event) will remove him from our dataset. The limit : no. of users is just 226


## Approach 1 : Event based

I will first seperate the indexed events into two lists. This just made a number of exploratory approaches easier. 

In [34]:
# use churn and not churn to find all indexes and make two lists
churn = df_selected.filter(df_selected["Cancelled"] == 1).select("index").rdd.flatMap(lambda x: x).collect()
not_churn = df_selected.filter(df_selected["Cancelled"] == 0).select("index").rdd.flatMap(lambda x: x).collect()


In [35]:
len(churn)

52

In [36]:
len(not_churn)

278102

As expected, a huge number of not_churn events. We would need to take a sample here.

In [37]:
#random sampling from not_churn, 1:10 ratio, fixing seed
random.seed(50)
not_churn = random.sample(not_churn, 520)

In [38]:
#removing any nulls
df_selected = df_selected.fillna(0)

In [39]:
# Feature Creation function. Note that we output a list. This is because using a dataframe in an appending loop is a memory nightmare !

def create_features_i(df_num, i):
    '''
    Create features for the dataframe df, given a specific index
    
    INPUT 
    df_num : Dataframe required
    i : Relevant index
    
    OUTPUT
    list_return : A list with the features
    
    '''

    df_num.createOrReplaceTempView("num_table")
    
    df_numerics = spark.sql('''
                    SELECT
                    MAX(index) AS index,
                    COUNT(song) AS songs,
                    COUNT(DISTINCT artist) AS artists,
                    COUNT(DISTINCT sessionId) AS sessions,
                    CAST(SUM(length) AS INT) AS list_time,
                    CAST(((MAX(ts-registration))/10000) AS FLOAT) as age,
                    SUM(CASE WHEN page = 'Add Friend' THEN 1 ELSE 0 END) AS frnd_cnt,
                    SUM(CASE WHEN page = 'Roll Advert' THEN 1 ELSE 0 END) AS ad_cnt,
                    SUM(CASE WHEN page = 'Add to Playlist' THEN 1 ELSE 0 END) AS playl_cnt,
                    SUM(CASE WHEN page = 'Cancel' THEN 1 ELSE 0 END) AS cncl_cnt,
                    SUM(CASE WHEN page = 'Error' THEN 1 ELSE 0 END) AS err_cnt,
                    SUM(CASE WHEN page = 'Help' THEN 1 ELSE 0 END) AS hlp_cnt,
                    SUM(CASE WHEN page = 'NextSong' THEN 1 ELSE 0 END) AS nxt_cnt,
                    SUM(CASE WHEN page = 'Thumbs Down' THEN 1 ELSE 0 END) AS tbdn_cnt,
                    SUM(CASE WHEN page = 'Thumbs Up' THEN 1 ELSE 0 END) AS tbup_cnt,
                    SUM(CASE WHEN page = 'Upgrade' THEN 1 ELSE 0 END) AS up_cnt       
                    FROM num_table
                    GROUP BY userId
          ''')            
     
    df_cat = df_num.filter(df_num["index"]==i).select("index", "gender", "OS", "level", "browser")
    df_temp = df_numerics.join(df_cat, ["index"])
    list_return = df_temp.toPandas().values.tolist()  
    return list_return

Now, what we want to do is identify the churn event, look back at all events for that userid that led to it, and make it into a feature

In [76]:
#build the churn dataset
list_churn = []
counter = 0
for i in churn:
    try:
        user = df_selected.filter(df_selected["index"]==i).select("userId").head()[0]
        end_date = df_selected.filter(df_selected["index"]==i).select("Date").head()[0]
        d = datetime.timedelta(days = 30)
        start_date = end_date - d
        df_x = df_selected.filter((df_selected["Date"].between(start_date, end_date)) & (df_selected["userId"]==user) & (df_selected["index"]<= i))
        counter += 1
        print("Index {}, no {}. \r".format(i, counter))
        list_df = create_features_i(df_x, i)
        list_churn.append(list_df)
    except:
        print("Error in index no. {}".format(i))
        continue

Index 2479, no 12. 
Index 6121, no 13. 
Index 19215, no 14. 
Index 19437, no 15. 
Index 20089, no 16. 
Index 27964, no 17. 
Index 30928, no 18. 
Index 32593, no 19. 
Index 48117, no 20. 
Index 50855, no 21. 
Index 55498, no 22. 
Index 58751, no 23. 
Index 59979, no 24. 
Index 62458, no 25. 
Index 70431, no 26. 
Index 72170, no 27. 
Index 72615, no 28. 
Index 73867, no 29. 
Index 74003, no 30. 
Index 86102, no 31. 
Index 91578, no 32. 
Index 93622, no 33. 
Index 95929, no 34. 
Index 98701, no 35. 
Index 100100, no 36. 
Index 104008, no 37. 
Index 109834, no 38. 
Index 110133, no 39. 
Index 123459, no 40. 
Index 133868, no 41. 
Index 154296, no 42. 
Index 155523, no 43. 
Index 155944, no 44. 
Index 156093, no 45. 
Index 156226, no 46. 
Index 161896, no 47. 
Index 164171, no 48. 
Index 169517, no 49. 
Index 186354, no 50. 
Index 196552, no 51. 
Index 197154, no 52. 
Index 203639, no 53. 
Index 206866, no 54. 
Index 215979, no 55. 
Index 216381, no 56. 
Index 219643, no 57. 
Index 224484, 

In [77]:
#quick fix for list of lists
list_churn = [item for items in list_churn for item in items]

In [79]:
#writing the results in a pickle file
with open('churn.pkl', 'wb') as f:
    pickle.dump(list_churn, f)

In [91]:
#build the not_churn dataset
list_not_churn = []
counter = 0

for i in not_churn:
    try:
        user = df_selected.filter(df_selected["index"]==i).select("userId").head()[0]
        end_date = df_selected.filter(df_selected["index"]==i).select("Date").head()[0]
        d = datetime.timedelta(days = 30)
        start_date = end_date - d
        df_x = df_selected.filter((df_selected["Date"].between(start_date, end_date)) & (df_selected["userId"]==user) & (df_selected["index"]<= i))
        counter += 1
        print("Index {}, no {}. \r".format(i, counter))
        list_df = create_features_i(df_x, i)
        list_not_churn.append(list_df)
    except:
        print("Error in index no. {}".format(i))
        continue


Index 260904, no 1. 
Index 139582, no 2. 
Index 190931, no 3. 
Index 127159, no 4. 
Index 248125, no 5. 
Index 173079, no 6. 
Index 44672, no 7. 
Index 166435, no 8. 
Index 117675, no 9. 
Index 44699, no 10. 
Index 80402, no 11. 
Index 182060, no 12. 
Index 51658, no 13. 
Index 182238, no 14. 
Index 167463, no 15. 
Index 116622, no 16. 
Index 99099, no 17. 
Index 36047, no 18. 
Index 172487, no 19. 
Index 224897, no 20. 
Index 46647, no 21. 
Index 33566, no 22. 
Index 175287, no 23. 
Index 225645, no 24. 
Index 3003, no 25. 
Index 119158, no 26. 
Index 176959, no 27. 
Index 139586, no 28. 
Index 227806, no 29. 
Index 55478, no 30. 
Index 105446, no 31. 
Index 232156, no 32. 
Index 142158, no 33. 
Index 241217, no 34. 
Index 66400, no 35. 
Index 141412, no 36. 
Index 109736, no 37. 
Index 64409, no 38. 
Index 215927, no 39. 
Index 61637, no 40. 
Index 260153, no 41. 
Index 223874, no 42. 
Index 271383, no 43. 
Index 65608, no 44. 
Index 261880, no 45. 
Index 270894, no 46. 
Index 157589

Index 55811, no 368. 
Index 208164, no 369. 
Index 274291, no 370. 
Index 207127, no 371. 
Index 129928, no 372. 
Index 151184, no 373. 
Index 65305, no 374. 
Index 164472, no 375. 
Index 63013, no 376. 
Index 180664, no 377. 
Index 260303, no 378. 
Index 85363, no 379. 
Index 85430, no 380. 
Index 4815, no 381. 
Index 17987, no 382. 
Index 130905, no 383. 
Index 12575, no 384. 
Index 45057, no 385. 
Index 232719, no 386. 
Index 171861, no 387. 
Index 132118, no 388. 
Index 88346, no 389. 
Index 3158, no 390. 
Index 170127, no 391. 
Index 191982, no 392. 
Index 28719, no 393. 
Index 254863, no 394. 
Index 227210, no 395. 
Index 15304, no 396. 
Index 270994, no 397. 
Index 2596, no 398. 
Index 155731, no 399. 
Index 222114, no 400. 
Index 245139, no 401. 
Index 203783, no 402. 
Index 93385, no 403. 
Index 205896, no 404. 
Index 61815, no 405. 
Index 235935, no 406. 
Index 176805, no 407. 
Index 34947, no 408. 
Index 148850, no 409. 
Index 227013, no 410. 
Index 177671, no 411. 
Index 27

In [92]:
#quick fix
list_not_churn = [item for items in list_not_churn for item in items]

In [93]:
#writing it into a file

with open('not_churn.pkl', 'wb') as f:
    pickle.dump(list_not_churn, f)

# Modeling - Approach 1
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [41]:
#create empty dataframe

schema_1 = StructType([
  StructField('index', IntegerType(), False),
  StructField('songs', IntegerType(), False),
  StructField('artists', IntegerType(), False),
  StructField('sessions', IntegerType(), False),
  StructField('list_time', IntegerType(), False),
  StructField('age', FloatType(), False),
  StructField('frnd_cnt', IntegerType(), False),
  StructField('ad_cnt', IntegerType(), False),
  StructField('playl_cnt', IntegerType(), False),
  StructField('cncl_cnt', IntegerType(), False),
  StructField('err_cnt', IntegerType(), False),
  StructField('hlp_cnt', IntegerType(), False),
  StructField('nxt_cnt', IntegerType(), False),
  StructField('tbdn_cnt', IntegerType(), False),
  StructField('tbup_cnt', IntegerType(), False),
  StructField('up_cnt', IntegerType(), False),
  StructField('gender', StringType(), False),
  StructField('OS', StringType(), False),
  StructField('level', StringType(), False),
  StructField('browser', StringType(), False),
  ])


In [42]:
# regenerating the churn list and converting it into a spark dataset

with open('churn.pkl', 'rb') as f:
    churn_list = pickle.load(f)
    
rdd = spark.sparkContext.parallelize(churn_list)
df_churn = spark.createDataFrame(rdd,schema_1)

In [43]:
# # regenerating the not_churn list and converting it into a spark dataset
with open('not_churn.pkl', 'rb') as f:
    not_churn_list = pickle.load(f)
    
rdd = spark.sparkContext.parallelize(not_churn_list)
df_not_churn = spark.createDataFrame(rdd,schema_1)

In [44]:
# Add back the churn column as label variable
df_churn = df_churn.withColumn("label", lit(1))
df_not_churn = df_not_churn.withColumn("label", lit(0))

In [45]:
# ready the final set of data
df = df_churn.union(df_not_churn)

In [46]:
#Final indexing in prep for Encoding and vectorizing

def indexing(df_cat, categories):
    '''
    Map Categorical labels to numbers 
    
    INPUT 
    df_cat : Dataframe required
    
    OUTPUT
    df_return : Dataframe with categorical features mapped to numbers
    
    '''

    stages = []


    for categoricalCol in categories:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_idx')
        stages += [stringIndexer]
    
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df_cat)
    df_return= pipelineModel.transform(df_cat)
    
    for categoricalCol in categories:
        df_return = df_return.drop(categoricalCol)
    
    return df_return

In [47]:
#map the categorical columns to numbers
idx_col = ["browser", "OS", "gender", "level"]
df = indexing(df, idx_col)

In [48]:
# Browser and OS need one hot encoding
encoder = OneHotEncoder(inputCols=["browser_idx", "OS_idx"], outputCols=["browser_1", "OS_1"])

In [49]:
model = encoder.fit(df)
df = model.transform(df)

Note : Here there is a key difference : I drop the "Cancel" feature column. The reason : It corresponds too closely to the event we want to measure. It of course means we lose data on users who do not press the confirm button - so maybe I will keep it for the bigger dataset. 

In [50]:
#vector assembly
vecAssembler = VectorAssembler(inputCols=['songs', 'artists','sessions', 'list_time', 'age', 'frnd_cnt', 'ad_cnt','playl_cnt','err_cnt','hlp_cnt','nxt_cnt','tbdn_cnt','tbup_cnt','up_cnt','browser_1','OS_1', 'gender_idx','level_idx' ], outputCol="features")
df = vecAssembler.transform(df)

In [51]:
#scaling
scaler2 = MinMaxScaler(inputCol="features", outputCol="ScaledNumFeatures2")
scalerModel = scaler2.fit(df)
df = scalerModel.transform(df)

In [52]:
#split into test train. Due to the 1:10 ratio of churn to not churn, stratify it

zeros = df.filter(df["label"]==0)
ones = df.filter(df["label"]==1)
# split datasets into training and testing

train0, test0 = zeros.randomSplit([0.8,0.2], seed=50)
train1, test1 = ones.randomSplit([0.8,0.2], seed=50)
                                
# stack datasets back together
train = train0.union(train1)
test = test0.union(test1)

---

#### Model : Logistic Regression

One of the most widely used models, this model is fast, simple to use and works well with the limited data we have

In [53]:
lr = LogisticRegression(featuresCol = 'ScaledNumFeatures2', labelCol = 'label', maxIter=5)
lrmodel = lr.fit(train)
predictions = lrmodel.transform(test)

In [54]:
lr_evaluator = MulticlassClassificationEvaluator(labelCol = "label",predictionCol="prediction")
lr_accuracy = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "accuracy"})
lr_f1score = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "f1"})

In [55]:
print('Logistic Regression Metrics:')
print('Accuracy: {}'.format(lr_accuracy))
print('F-1 Score:{}'.format(lr_f1score))
print('Accuracy: {}'.format(lr_accuracy))


Logistic Regression Metrics:
Accuracy: 0.9482758620689655
F-1 Score:0.9231003967043027
Accuracy: 0.9482758620689655


Not bad ! The limited data really puts a question mark here, but the model seems to be doing well with an F1 score of 92%

#### Model : Random Forest Classifier
This model is a great model for small datasets, fast and deals well with features that may or may not be relevant to the final outcome at all. 

In [56]:
rf = RandomForestClassifier(labelCol="label", featuresCol="ScaledNumFeatures2")
rfmodel = rf.fit(train)
predictions = rfmodel.transform(test)

In [57]:
rf_evaluator = MulticlassClassificationEvaluator(labelCol = "label",predictionCol="prediction")
rf_accuracy = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "accuracy"})
rf_f1score = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "f1"})

In [58]:
print('Randon Forest Metrics:')
print('Accuracy: {}'.format(rf_accuracy))
print('F-1 Score:{}'.format(rf_f1score))

Randon Forest Metrics:
Accuracy: 0.9482758620689655
F-1 Score:0.9231003967043027


Similar Accuracy as regression. Lets try one more

#### Model : Gradient Boosted Trees
A variation of Random forest that may work better here

In [59]:
gbt = GBTClassifier(labelCol="label", featuresCol="ScaledNumFeatures2", maxIter=10)
gbtmodel = gbt.fit(train)
predictions = gbtmodel.transform(test)

In [60]:
gbt_evaluator = MulticlassClassificationEvaluator(labelCol = "label",predictionCol="prediction")
gbt_accuracy = gbt_evaluator.evaluate(predictions, {gbt_evaluator.metricName: "accuracy"})
gbt_f1score = gbt_evaluator.evaluate(predictions, {gbt_evaluator.metricName: "f1"})

In [61]:
print('Gradient Boosted Trees Metrics:')
print('Accuracy: {}'.format(rf_accuracy))
print('F-1 Score:{}'.format(rf_f1score))

Gradient Boosted Trees Metrics:
Accuracy: 0.9482758620689655
F-1 Score:0.9231003967043027


Identical scores. I tried a few others but it seems we are hitting the limits of what we can predict with this approach here.

## Approach 2 : User based

In [None]:
# use churn and not churn to find all userids and make two lists
churn = df_selected.filter(df_selected["Cancelled"] == 1).select("userId").dropDuplicates().rdd.flatMap(lambda x: x).collect()
not_churn = df_selected.filter(df_selected["Cancelled"] == 0).select("userId").dropDuplicates().rdd.flatMap(lambda x: x).collect()

In [62]:
# Feature Creation function

def create_features_u(df_num, i):
    '''
    Create features for the dataframe df, given a specific userid
    
    INPUT 
    df_num : Dataframe required
    i : useriD
    
    OUTPUT
    list_return : List with Features
    
    '''

    df_num.createOrReplaceTempView("num_table")
    
    df_numerics = spark.sql('''
                    SELECT
                    DISTINCT(userId),
                    COUNT(song) AS songs,
                    COUNT(DISTINCT artist) AS artists,
                    COUNT(DISTINCT sessionId) AS sessions,
                    CAST(SUM(length) AS INT) AS list_time,
                    CAST(((MAX(ts-registration))/10000) AS FLOAT) as age,
                    SUM(CASE WHEN page = 'Add Friend' THEN 1 ELSE 0 END) AS frnd_cnt,
                    SUM(CASE WHEN page = 'Roll Advert' THEN 1 ELSE 0 END) AS ad_cnt,
                    SUM(CASE WHEN page = 'Add to Playlist' THEN 1 ELSE 0 END) AS playl_cnt,
                    SUM(CASE WHEN page = 'Cancel' THEN 1 ELSE 0 END) AS cncl_cnt,
                    SUM(CASE WHEN page = 'Error' THEN 1 ELSE 0 END) AS err_cnt,
                    SUM(CASE WHEN page = 'Help' THEN 1 ELSE 0 END) AS hlp_cnt,
                    SUM(CASE WHEN page = 'NextSong' THEN 1 ELSE 0 END) AS nxt_cnt,
                    SUM(CASE WHEN page = 'Thumbs Down' THEN 1 ELSE 0 END) AS tbdn_cnt,
                    SUM(CASE WHEN page = 'Thumbs Up' THEN 1 ELSE 0 END) AS tbup_cnt,
                    SUM(CASE WHEN page = 'Upgrade' THEN 1 ELSE 0 END) AS up_cnt       
                    FROM num_table
                    GROUP BY userId
          ''')            
    # Lets join in the categorical features. We use the last known status based on our index
    
    df_cat = df_num.filter(df_num["userId"]==i).orderBy("index", ascending=False).limit(1).select("userId", "gender", "OS", "level", "browser")
    df_temp = df_numerics.join(df_cat, ["userId"])
    list_return = df_temp.toPandas().values.tolist()  
    return list_return


In [None]:
#build the churn dataset
list_churn = []
counter = 0
for i in churn:
    try:
        user = i
        counter += 1
        print("Index {}, no {}. \r".format(i, counter))
        list_df = create_features_u(df_selected, i)
        list_churn.append(list_df)
    except:
        print("Error in index no. {}".format(i))
        continue

In [None]:
#build the not_churn dataset
list_not_churn = []
counter = 0

for i in not_churn:
    try:
        user = i
        counter += 1
        print("Index {}, no {}. \r".format(i, counter))
        list_df = create_features_u(df_selected, i)
        list_not_churn.append(list_df)
    except:
        print("Error in index no. {}".format(i))
        continue

In [None]:
#fixes 
list_churn = [item for items in list_churn for item in items]
list_not_churn = [item for items in list_not_churn for item in items]

In [None]:
#write to Pickle
with open('churn2.pkl', 'wb') as f:
    pickle.dump(list_churn, f)

In [None]:
with open('not_churn2.pkl', 'wb') as f:
    pickle.dump(list_not_churn, f)

# Modeling - Approach 2

In [63]:
#create empty dataframe for vectors

schema_2 = StructType([
  StructField('userId', StringType(), False),
  StructField('songs', IntegerType(), False),
  StructField('artists', IntegerType(), False),
  StructField('sessions', IntegerType(), False),
  StructField('list_time', IntegerType(), False),
  StructField('age', FloatType(), False),
  StructField('frnd_cnt', IntegerType(), False),
  StructField('ad_cnt', IntegerType(), False),
  StructField('playl_cnt', IntegerType(), False),
  StructField('cncl_cnt', IntegerType(), False),
  StructField('err_cnt', IntegerType(), False),
  StructField('hlp_cnt', IntegerType(), False),
  StructField('nxt_cnt', IntegerType(), False),
  StructField('tbdn_cnt', IntegerType(), False),
  StructField('tbup_cnt', IntegerType(), False),
  StructField('up_cnt', IntegerType(), False),
  StructField('gender', StringType(), False),
  StructField('OS', StringType(), False),
  StructField('level', StringType(), False),
  StructField('browser', StringType(), False),
  ])


In [64]:
#read in the lists from pickle and create dataframe
with open('churn2.pkl', 'rb') as f:
    churn_list = pickle.load(f)
    
rdd = spark.sparkContext.parallelize(churn_list)
df_churn = spark.createDataFrame(rdd,schema_2)

In [65]:
with open('not_churn2.pkl', 'rb') as f:
    not_churn_list = pickle.load(f)
    
rdd = spark.sparkContext.parallelize(not_churn_list)
df_not_churn = spark.createDataFrame(rdd,schema_2)

In [66]:
# Add back the churn column as label variable
df_churn = df_churn.withColumn("label", lit(1))
df_not_churn = df_not_churn.withColumn("label", lit(0))

In [67]:
# ready the final set

df = df_churn.union(df_not_churn)

In [68]:
#Final indexing in prep for Encoding and vectorizing

def indexing(df_cat, categories):
    '''
    Map Categorical labels to numbers 
    
    INPUT 
    df_cat : Dataframe required
    
    OUTPUT
    df_return : Dataframe with categorical features mapped to numbers
    
    '''

    stages = []


    for categoricalCol in categories:
        stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_idx')
        stages += [stringIndexer]
    
    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df_cat)
    df_return= pipelineModel.transform(df_cat)
    
    for categoricalCol in categories:
        df_return = df_return.drop(categoricalCol)
    
    return df_return

In [69]:
#map the categorical columns to numbers
idx_col = ["browser", "OS", "gender", "level"]
df = indexing(df, idx_col)

In [70]:
# Browser and OS need one hot encoding
encoder = OneHotEncoder(inputCols=["browser_idx", "OS_idx"], outputCols=["browser_1", "OS_1"])
model = encoder.fit(df)
df = model.transform(df)

In [71]:
#vector assembly
vecAssembler = VectorAssembler(inputCols=['songs', 'artists','sessions', 'list_time', 'age', 'frnd_cnt', 'ad_cnt','playl_cnt','err_cnt','hlp_cnt','nxt_cnt','tbdn_cnt','tbup_cnt','up_cnt','browser_1','OS_1', 'gender_idx','level_idx' ], outputCol="features")
df = vecAssembler.transform(df)

In [72]:
#scaling
scaler2 = MinMaxScaler(inputCol="features", outputCol="ScaledNumFeatures2")
scalerModel = scaler2.fit(df)
df = scalerModel.transform(df)

In [73]:
#split into test train. Due to the low ration of churn events, stratifying it

zeros = df.filter(df["label"]==0)
ones = df.filter(df["label"]==1)
# split datasets into training and testing

train0, test0 = zeros.randomSplit([0.8,0.2], seed=50)
train1, test1 = ones.randomSplit([0.8,0.2], seed=50)
                                
# stack datasets back together
train = train0.union(train1)
test = test0.union(test1)

---

#### Model : Logistic Regression

In [74]:
# Fitting and training
lr = LogisticRegression(featuresCol = 'ScaledNumFeatures2', labelCol = 'label', maxIter=5)
lrmodel = lr.fit(train)
predictions = lrmodel.transform(test)

In [75]:
#Evaluating
lr_evaluator = MulticlassClassificationEvaluator(labelCol = "label",predictionCol="prediction")
lr_accuracy = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "accuracy"})
lr_f1score = lr_evaluator.evaluate(predictions, {lr_evaluator.metricName: "f1"})

In [76]:
print('Logistic Regression Metrics:')
print('Accuracy: {}'.format(lr_accuracy))
print('F-1 Score:{}'.format(lr_f1score))

Logistic Regression Metrics:
Accuracy: 0.896551724137931
F-1 Score:0.8476489028213167


A decent F1 score of 84%, though lower than approach 1. 

#### Model : Random Forest Classifier

In [77]:
# Fitting and training
rf = RandomForestClassifier(labelCol="label", featuresCol="ScaledNumFeatures2")
rfmodel = rf.fit(train)
predictions = rfmodel.transform(test)

In [78]:
#Evaluating
rf_evaluator = MulticlassClassificationEvaluator(labelCol = "label",predictionCol="prediction")
rf_accuracy = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "accuracy"})
rf_f1score = rf_evaluator.evaluate(predictions, {rf_evaluator.metricName: "f1"})

In [79]:
print('Randon Forest Metrics:')
print('Accuracy: {}'.format(rf_accuracy))
print('F-1 Score:{}'.format(rf_f1score))

Randon Forest Metrics:
Accuracy: 0.7758620689655172
F-1 Score:0.7833947104117844


A lower f1 score, which is interesting, as I expected this to perform better - I assumed a lot of variables we took were noise and not really explanatory. 

#### Model : Gradient Boosted Trees

In [80]:
# Fitting and training
gbt = GBTClassifier(labelCol="label", featuresCol="ScaledNumFeatures2", maxIter=10)
gbtmodel = gbt.fit(train)
predictions = gbtmodel.transform(test)

In [81]:
#Evaluating
gbt_evaluator = MulticlassClassificationEvaluator(labelCol = "label",predictionCol="prediction")
gbt_accuracy = gbt_evaluator.evaluate(predictions, {gbt_evaluator.metricName: "accuracy"})
gbt_f1score = gbt_evaluator.evaluate(predictions, {gbt_evaluator.metricName: "f1"})

In [82]:
print('Gradient Boosted Trees Metrics:')
print('Accuracy: {}'.format(rf_accuracy))
print('F-1 Score:{}'.format(rf_f1score))

Gradient Boosted Trees Metrics:
Accuracy: 0.7758620689655172
F-1 Score:0.7833947104117844


No change from the Random forest classifier

## Conclusion:

It seems the first approach has a better accuracy, but the second likely makes use of the demographic variables better, and thus may perform better in the real world. The logistic regression model performs better on most fronts here. 

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.