# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [422]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

In [293]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Capstone Data Science") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [332]:
df = spark.read.json("../../mini_sparkify_event_data.json")

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [333]:
df.take(1)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')]

In [334]:
# Print out the data schema 
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [335]:
# Display each features statistics 
df.describe().toPandas()

Unnamed: 0,summary,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,count,228108,286500,278154,278154,286500.0,278154,228108.0,286500,278154,286500,286500,278154.0,286500.0,228108,286500.0,286500.0,278154,286500.0
1,mean,551.0852017937219,,,,114.41421291448516,,249.11718197783583,,,,,1535358834085.6191,1041.526554973822,Infinity,210.05459685863875,1540956889810.3943,,59682.02278593872
2,stddev,1217.7693079161377,,,,129.7672620114106,,99.23517921058313,,,,,3291321616.328124,726.7762634630807,,31.505078488421987,1507543960.82106,,109091.9499991056
3,min,!!!,Cancelled,Adelaida,F,0.0,Adams,0.78322,free,"Albany, OR",GET,About,1521380675000.0,1.0,ÃÂg ÃÂtti GrÃÂ¡a ÃÂsku,200.0,1538352117000.0,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10)...",
4,max,ÃÂlafur Arnalds,Logged Out,Zyonna,M,1321.0,Wright,3024.66567,paid,"Winston-Salem, NC",PUT,Upgrade,1543247354000.0,2474.0,ÃÂau hafa sloppiÃÂ° undan ÃÂ¾unga myrkursins,404.0,1543799476000.0,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ...,99.0


In [336]:
## Drop rows with missing values for userID and SessionID
df = df.dropna(how = "any", subset = ["userId", "sessionId"])
df.count()

286500

In [337]:
# Because of unregistered users, we have blank userIds 
df.select("userId").dropDuplicates().sort("userId").show()

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
|100008|
|100009|
|100010|
|100011|
|100012|
|100013|
|100014|
|100015|
|100016|
|100017|
+------+
only showing top 20 rows



In [338]:
## Only look at valid users 
df = df.filter(df["userId"] != "")

In [339]:
# New size of data set without unregistered users
df.count()

278154

In [340]:
# Number of unique users 
df.select("userId").dropDuplicates().count()

225

In [341]:
# Get all unique pages of the app
all_pages = df.select("page").dropDuplicates()
all_pages.show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+



In [342]:
## The number of male vs female users 
df.select("userId","gender").dropDuplicates().groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|  104|
|     M|  121|
+------+-----+



In [343]:
## The number of paid vs free plan users 
df.select("userId","level").dropDuplicates().groupBy("level").count().show()

+-----+-----+
|level|count|
+-----+-----+
| free|  195|
| paid|  165|
+-----+-----+



### Changing timesrepresentation from milliseconds to seconds (date time)
Both registration and ts is represented in milliseconds in the data set. In order to be able to use standard date time functions we need them in datetime. 

In [344]:
## Need to convert registration and ts -- as they are using time stamps in ms
df.select("registration").show()

+-------------+
| registration|
+-------------+
|1538173362000|
|1538331630000|
|1538173362000|
|1538331630000|
|1538173362000|
|1538331630000|
|1538331630000|
|1538173362000|
|1538173362000|
|1538173362000|
|1538331630000|
|1538331630000|
|1538173362000|
|1538331630000|
|1538331630000|
|1538173362000|
|1538331630000|
|1537365219000|
|1538173362000|
|1538331630000|
+-------------+
only showing top 20 rows



In [345]:
# Registration --> registered (dt)
df = df.withColumn("registered",(F.col("registration")/1000).cast(T.TimestampType()))
df.select("registered").show()

+-------------------+
|         registered|
+-------------------+
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
|2018-09-30 20:20:30|
|2018-09-29 00:22:42|
|2018-09-29 00:22:42|
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
|2018-09-30 20:20:30|
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
|2018-09-30 20:20:30|
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
|2018-09-19 15:53:39|
|2018-09-29 00:22:42|
|2018-09-30 20:20:30|
+-------------------+
only showing top 20 rows



In [346]:
# ts as timestamp (dt)
df = df.withColumn("timestamp",(F.col("ts")/1000).cast(T.TimestampType()))
df.select("timestamp").show()

+-------------------+
|          timestamp|
+-------------------+
|2018-10-01 02:01:57|
|2018-10-01 02:03:00|
|2018-10-01 02:06:34|
|2018-10-01 02:06:56|
|2018-10-01 02:11:16|
|2018-10-01 02:11:18|
|2018-10-01 02:14:46|
|2018-10-01 02:14:59|
|2018-10-01 02:15:05|
|2018-10-01 02:18:04|
|2018-10-01 02:19:06|
|2018-10-01 02:19:10|
|2018-10-01 02:20:18|
|2018-10-01 02:22:55|
|2018-10-01 02:22:56|
|2018-10-01 02:24:01|
|2018-10-01 02:26:16|
|2018-10-01 02:27:48|
|2018-10-01 02:28:07|
|2018-10-01 02:29:04|
+-------------------+
only showing top 20 rows



In [347]:
df.select("userId").take(5)

[Row(userId='30'),
 Row(userId='9'),
 Row(userId='30'),
 Row(userId='9'),
 Row(userId='30')]

In [348]:
df.filter(F.col("userId")=="30").select("userId","page","registered","timestamp").sort("timestamp").toPandas().head(30)

Unnamed: 0,userId,page,registered,timestamp
0,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:01:57
1,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:06:34
2,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:11:16
3,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:14:59
4,30,Add to Playlist,2018-09-29 00:22:42,2018-10-01 02:15:05
5,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:18:04
6,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:20:18
7,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:24:01
8,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:28:07
9,30,NextSong,2018-09-29 00:22:42,2018-10-01 02:31:49


### Defining Churn 
According to this blog post on [How to Reduce Customer Churn](https://blog.hubspot.com/service/how-to-reduce-customer-churn); 

>"Customer churn -- also known as customer attrition -- refers to the rate at which customers who purchase or subscribe to your product or service offering end their relationship with you and stop bringing in revenue for your business."

For this case, the registered users that either subscribe to the music service on the free or paid plan, churns when they confrim the cancellation of service. Hence, a good meassure of churn is the number of Submits to the "Cancellation Confirmation" page. 

In [349]:
# Find users that Submit Cancellation Confirmation
df.filter("page = 'Cancellation Confirmation'").show()

+------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------------------+-------------+---------+----+------+-------------+--------------------+------+-------------------+-------------------+
|artist|     auth|firstName|gender|itemInSession| lastName|length|level|            location|method|                page| registration|sessionId|song|status|           ts|           userAgent|userId|         registered|          timestamp|
+------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------------------+-------------+---------+----+------+-------------+--------------------+------+-------------------+-------------------+
|  null|Cancelled|   Adriel|     M|          104|  Mendoza|  null| paid|  Kansas City, MO-KS|   GET|Cancellation Conf...|1535623466000|      514|null|   200|1538943990000|"Mozilla/5.0 (Mac...|    18|2018-08-30 12:04:26|2018-10-07 22:26:30|
|  null|Cancelled|    Diego|     M|     

In [350]:
# We define a new "flag_churn_event" and set it to 1 if it happened, and 0 otherwise. 
flag_churn_event = F.udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, T.IntegerType())

# Add churn (cancel confirmation) column 
df = df.withColumn("churn", flag_churn_event("page"))

In [351]:
## Number of users that has churned 
users_churned = df.where(df.churn==1).dropDuplicates().select("userId").toPandas()["userId"].tolist()
len(users_churned)

52

In [352]:
# DataFrame of churned users 
df_churned = df.filter(df.userId.isin(users_churned))
# Data Frame of users not churned 
df_not_churned = df.filter(~df.userId.isin(users_churned))

In [353]:
## Number of users that hasn't churned 
df_not_churned.select("userId").dropDuplicates().count()

173

In [354]:
## Gender distribution in Churned users
df_churned.select("userId","gender").dropDuplicates().groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|   20|
|     M|   32|
+------+-----+



In [355]:
## Gender distribution for non-churned users 
df_not_churned.select("userId","gender").dropDuplicates().groupBy("gender").count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|   84|
|     M|   89|
+------+-----+



In [356]:
## Distribution of paid vs free plan users amongst the churned users 
df_churned.select("userId","level").dropDuplicates().groupBy("level").count().show()

+-----+-----+
|level|count|
+-----+-----+
| free|   46|
| paid|   36|
+-----+-----+



In [357]:
## Distribution of paid vs free plan users amongst the non-churned users 
df_not_churned.select("userId","level").dropDuplicates().groupBy("level").count().show()

+-----+-----+
|level|count|
+-----+-----+
| free|  149|
| paid|  129|
+-----+-----+



In [358]:
# Print out the data schema 
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- registered: timestamp (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- churn: integer (nullable = true)



In [359]:
# To make it easer using the categorical variabels gender and level we create a flag for them as well
# We define a new "female_flag" and set it to 1 if F, and 0 otherwise. 
flag_female = F.udf(lambda x: 1 if x == "F" else 0, T.IntegerType())
# We define a new "paid_plan_flag" and set it to 1 if F, and 0 otherwise. 
paid_plan_flag = F.udf(lambda x: 1 if x == "paid" else 0, T.IntegerType())

df = df.withColumn("female", flag_female("gender"))
df = df.withColumn("paid_plan", flag_female("level"))

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [How to Improve Your Subscription Based Business By Predicting Churn](https://neilpatel.com/blog/improve-by-predicting-churn/) Neil Patel oulines four main feature types that are relevant for predicting churn; 

1. Customer features: basic information about the user/customer (eg. age, gender etc)

2. Support features: information of the user interaction with customer support (satisfaction ratings eg. Number of thumbs up or down, number of errors and help )

3. Usage features: characterization of the customer's usage of the service (eg. level, add friend, add to playlist, downgrades/upgrade counts, total length of songs streamed, number of sessions, length of using the service) 

4. Contextual features: any other contextual inforamtion we have on the customer (eg. device type. location, user agent etc.)

We will use this guide to engineer the features we need for our model. 

In [360]:
# Create a column of number of days using the service from first ts to last ts
# Create a range for each user sorted by timeseries for the sessions 
user_window = Window \
    .partitionBy('userID') \
    .orderBy(F.desc('timestamp')) \
    .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# The number of days that they have used the service
df = df.withColumn("days_using_service", F.datediff(F.first(df.timestamp).over(user_window), 
                                  F.last(df.timestamp).over(user_window)))
# The last level they had 
df = df.withColumn("last_level_paid",F.last("paid_plan").over(user_window))

In [361]:
df.filter(F.col("userId")==users_churned[0]).select("userId","page","registered","timestamp","days_using_service","last_level_paid","churn").sort("timestamp").toPandas().head(30)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/miniconda2/envs/spark_env/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'VectorAssembler' object has no attribute '_java_obj'


Unnamed: 0,userId,page,registered,timestamp,days_using_service,last_level_paid,churn
0,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:32:53,49,0,0
1,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:37:23,49,0,0
2,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:40:24,49,0,0
3,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:47:38,49,0,0
4,53,Roll Advert,2018-09-27 14:09:24,2018-10-01 20:47:43,49,0,0
5,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:50:47,49,0,0
6,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:55:13,49,0,0
7,53,Home,2018-09-27 14:09:24,2018-10-01 20:55:40,49,0,0
8,53,NextSong,2018-09-27 14:09:24,2018-10-01 20:58:26,49,0,0
9,53,NextSong,2018-09-27 14:09:24,2018-10-01 21:01:54,49,0,0


In [362]:
df.take(1)

[Row(artist=None, auth='Logged In', firstName='Darianna', gender='F', itemInSession=34, lastName='Carpenter', length=None, level='free', location='Bridgeport-Stamford-Norwalk, CT', method='PUT', page='Logout', registration=1538016340000, sessionId=187, song=None, status=307, ts=1542823952000, userAgent='"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53"', userId='100010', registered=datetime.datetime(2018, 9, 27, 4, 45, 40), timestamp=datetime.datetime(2018, 11, 21, 19, 12, 32), churn=0, female=1, paid_plan=0, days_using_service=44, last_level_paid=0)]

In [363]:
## Create new user_df calculating the features 

user_df = df.groupBy("userId").agg(
    
    # Customer related features
    F.max("female").alias("female"),
    
    # Support related features
    F.sum(F.when(F.col("page")=="Thumbs Up", 1).otherwise(0)).alias("thumb_ups"),
    F.sum(F.when(F.col("page")=="Thumbs Down", 1).otherwise(0)).alias("thumb_downs"),
    F.sum(F.when(F.col("page")=="Error", 1).otherwise(0)).alias("errors"),
    F.sum(F.when(F.col("page")=="Help", 1).otherwise(0)).alias("support_enquiries"),
    
    # Usage related features
    F.max("churn").alias("churn"),
    F.max("last_level_paid").alias("last_level_paid"),
    F.sum(F.when(F.col("page")=="Add Firend", 1).otherwise(0)).alias("referrals"),
    F.sum(F.when(F.col("page")=="Add to Playlist", 1).otherwise(0)).alias("playlists"),
    F.sum(F.when(F.col("page")=="Submit Downgrade", 1).otherwise(0)).alias("downgrades"),
    F.sum(F.when(F.col("page")=="Submit Upgrade", 1).otherwise(0)).alias("upgrades"),
    F.sum(F.when(F.col("page")=="NextSong", 1).otherwise(0)).alias("songs"),
    F.mean("itemInSession").alias("avr_songs_per_sessions"),
    F.sum("length").alias("total_length_streamed"),
    F.countDistinct("Artist").alias("artists"),
    F.countDistinct("sessionId").alias("sessions"),
    F.first("days_using_service").alias("days_using_service"),
    
    # Contextual features
    F.countDistinct("userAgent").alias("user_agents"),
    F.countDistinct("location").alias("locations"),
    )

In [364]:
user_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- female: integer (nullable = true)
 |-- thumb_ups: long (nullable = true)
 |-- thumb_downs: long (nullable = true)
 |-- errors: long (nullable = true)
 |-- support_enquiries: long (nullable = true)
 |-- churn: integer (nullable = true)
 |-- last_level_paid: integer (nullable = true)
 |-- referrals: long (nullable = true)
 |-- playlists: long (nullable = true)
 |-- downgrades: long (nullable = true)
 |-- upgrades: long (nullable = true)
 |-- songs: long (nullable = true)
 |-- avr_songs_per_sessions: double (nullable = true)
 |-- total_length_streamed: double (nullable = true)
 |-- artists: long (nullable = false)
 |-- sessions: long (nullable = false)
 |-- days_using_service: integer (nullable = true)
 |-- user_agents: long (nullable = false)
 |-- locations: long (nullable = false)



In [365]:
user_df.take(1)

[Row(userId='100010', female=1, thumb_ups=17, thumb_downs=5, errors=0, support_enquiries=2, churn=0, last_level_paid=0, referrals=0, playlists=7, downgrades=0, upgrades=0, songs=275, avr_songs_per_sessions=35.89501312335958, total_length_streamed=66940.89735000003, artists=252, sessions=7, days_using_service=44, user_agents=1, locations=1)]

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [429]:
## Rename churn to label for use in machine learning
user_df = user_df.withColumnRenamed("churn","label")

## Splitting into train, test and validation set. 
# Split full data set to 80% training data and 20% for rest 

train, rest = user_df.randomSplit([0.8,0.2],seed=123)
# Split further so that 10% test and 10% validation
test,validation = rest.randomSplit([0.5,0.5],seed=123)

## Classifiers 

We will test using three different classifiers for predicting churb (1 vs 0). Tree popular classifiers these types of classification tasks are LogisticRegression (binomial), Decision Tree classifier and Random Forrest Classifier [ref](https://spark.apache.org/docs/latest/ml-classification-regression.html) 

### Logistic Regression

In [430]:
# Vectorize nummerical features
assembler = VectorAssembler(
    inputCols = ["female","thumb_ups","thumb_downs","errors","support_enquiries","last_level_paid",
                 "referrals","playlists","downgrades","upgrades","songs","songs","avr_songs_per_sessions",
                 "total_length_streamed","artists","sessions","days_using_service","user_agents","locations"],
    outputCol = "NumFeatures")

# Normalize nummeric features 
scaler = StandardScaler(inputCol = "NumFeatures",outputCol = "features",withMean=True,withStd =True)

# Build the logistic regression model
lr = LogisticRegression(featuresCol = "features",labelCol = "label", maxIter = 10,regParam =0.5, elasticNetParam=0.8)

# Biild the pipeline
lr_pipeline = Pipeline(stages = [assembler,scaler,lr]) 

In [410]:
# Train the model
lr_model = lr_pipeline.fit(train)

In [411]:
# Predict
lr_results = lr_model.transform(test)

In [391]:
lr_results.head()

Row(userId='11', female=1, thumb_ups=40, thumb_downs=9, errors=1, support_enquiries=3, churn=0, last_level_paid=0, referrals=0, playlists=20, downgrades=1, upgrades=2, songs=647, avr_songs_per_sessions=46.89622641509434, total_length_streamed=159669.96304000012, artists=534, sessions=16, days_using_service=53, user_agents=1, locations=1, NumFeatures=DenseVector([1.0, 40.0, 9.0, 1.0, 3.0, 0.0, 0.0, 20.0, 1.0, 2.0, 647.0, 647.0, 46.8962, 159669.963, 534.0, 16.0, 53.0, 1.0, 1.0]), features=DenseVector([1.1069, -0.1887, -0.1163, 0.0331, -0.4599, 0.0, 0.0, -0.2239, 1.3473, 1.8101, -0.3028, -0.3028, -0.6359, -0.3094, -0.2146, 0.3157, 0.6199, 0.0, 0.0]), rawPrediction=DenseVector([1.1907, -1.1907]), probability=DenseVector([0.7669, 0.2331]), prediction=0.0)

In [412]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(lr_results))

Test Area Under ROC 0.5


In [438]:
def evaluate_pipeline(results):
    true_pos = lr_results.filter((results.prediction ==0.0) & (results.label == 0)).count()
    false_pos = lr_results.filter((results.prediction ==0.0) & (results.label == 1)).count()
    true_neg = lr_results.filter((results.prediction ==1.0) & (results.label == 1)).count()
    false_neg = lr_results.filter((results.prediction ==1.0) & (results.label == 0)).count()
    
    precision = true_pos / (true_pos+false_pos) if (true_pos+false_pos) !=0 else 0
    print(f"Precision: {precision}")
    recall = true_pos / (true_pos+false_neg) if (true_pos+false_neg) != 0 else 0
    print(f"Recall: {recall}")
    f1_measure = (2*precision*recall) / (precision + recall) if (precision + recall) !=0 else 0
    print(f"F1 score: {f1_measure}")
    accuracy = (true_pos+true_neg) / (true_pos+false_pos+true_neg+false_neg)
    print(f"Accuracy: {accuracy}")
    

In [439]:
evaluate_pipeline(lr_results)

Precision: 0.7272727272727273
Recall: 1.0
F1 score: 0.8421052631578948
Accuracy: 0.7272727272727273


### Decision Tree Classifier

In [440]:
# Bild pipeline -> reuse assembler and scaler from above

# classifier 
dtc = DecisionTreeClassifier()

# pipeline 

dtc_pipeline = Pipeline(stages=[assembler,scaler,dtc])

In [441]:
# Train model
dtc_model = dtc_pipeline.fit(train)

In [442]:
# Make predictions 
dtc_results = dtc_model.transform(test)

In [443]:
dtc_results.take(1)

[Row(userId='11', female=1, thumb_ups=40, thumb_downs=9, errors=1, support_enquiries=3, label=0, last_level_paid=0, referrals=0, playlists=20, downgrades=1, upgrades=2, songs=647, avr_songs_per_sessions=46.89622641509434, total_length_streamed=159669.96304000012, artists=534, sessions=16, days_using_service=53, user_agents=1, locations=1, NumFeatures=DenseVector([1.0, 40.0, 9.0, 1.0, 3.0, 0.0, 0.0, 20.0, 1.0, 2.0, 647.0, 647.0, 46.8962, 159669.963, 534.0, 16.0, 53.0, 1.0, 1.0]), features=DenseVector([1.0846, -0.214, -0.1263, -0.0209, -0.4762, 0.0, 0.0, -0.2559, 1.442, 1.8539, -0.3274, -0.3274, -0.6742, -0.3338, -0.2429, 0.2846, 0.6436, 0.0, 0.0]), rawPrediction=DenseVector([67.0, 0.0]), probability=DenseVector([1.0, 0.0]), prediction=0.0)]

In [444]:
# Evaluate 
evaluate_pipeline(dtc_results)

AnalysisException: 'Resolved attribute(s) prediction#177523 missing from label#127153,days_using_service#55974,avr_songs_per_sessions#55966,sessions#55972L,total_length_streamed#55968,errors#55948L,locations#55978L,thumb_downs#55946L,probability#132122,upgrades#55962L,rawPrediction#132098,userId#51839,support_enquiries#55950L,user_agents#55976L,prediction#132147,female#55942,last_level_paid#55954,playlists#55958L,songs#55964L,NumFeatures#132053,referrals#55956L,thumb_ups#55944L,downgrades#55960L,features#132075,artists#55970L in operator !Filter ((prediction#177523 = 0.0) && (label#127153 = 0)). Attribute(s) with the same name appear in the operation: prediction. Please check if the right attribute(s) are used.;;\n!Filter ((prediction#177523 = 0.0) && (label#127153 = 0))\n+- Project [userId#51839, female#55942, thumb_ups#55944L, thumb_downs#55946L, errors#55948L, support_enquiries#55950L, label#127153, last_level_paid#55954, referrals#55956L, playlists#55958L, downgrades#55960L, upgrades#55962L, songs#55964L, avr_songs_per_sessions#55966, total_length_streamed#55968, artists#55970L, sessions#55972L, days_using_service#55974, user_agents#55976L, locations#55978L, NumFeatures#132053, features#132075, rawPrediction#132098, probability#132122, UDF(rawPrediction#132098) AS prediction#132147]\n   +- Project [userId#51839, female#55942, thumb_ups#55944L, thumb_downs#55946L, errors#55948L, support_enquiries#55950L, label#127153, last_level_paid#55954, referrals#55956L, playlists#55958L, downgrades#55960L, upgrades#55962L, songs#55964L, avr_songs_per_sessions#55966, total_length_streamed#55968, artists#55970L, sessions#55972L, days_using_service#55974, user_agents#55976L, locations#55978L, NumFeatures#132053, features#132075, rawPrediction#132098, UDF(rawPrediction#132098) AS probability#132122]\n      +- Project [userId#51839, female#55942, thumb_ups#55944L, thumb_downs#55946L, errors#55948L, support_enquiries#55950L, label#127153, last_level_paid#55954, referrals#55956L, playlists#55958L, downgrades#55960L, upgrades#55962L, songs#55964L, avr_songs_per_sessions#55966, total_length_streamed#55968, artists#55970L, sessions#55972L, days_using_service#55974, user_agents#55976L, locations#55978L, NumFeatures#132053, features#132075, UDF(features#132075) AS rawPrediction#132098]\n         +- Project [userId#51839, female#55942, thumb_ups#55944L, thumb_downs#55946L, errors#55948L, support_enquiries#55950L, label#127153, last_level_paid#55954, referrals#55956L, playlists#55958L, downgrades#55960L, upgrades#55962L, songs#55964L, avr_songs_per_sessions#55966, total_length_streamed#55968, artists#55970L, sessions#55972L, days_using_service#55974, user_agents#55976L, locations#55978L, NumFeatures#132053, UDF(NumFeatures#132053) AS features#132075]\n            +- Project [userId#51839, female#55942, thumb_ups#55944L, thumb_downs#55946L, errors#55948L, support_enquiries#55950L, label#127153, last_level_paid#55954, referrals#55956L, playlists#55958L, downgrades#55960L, upgrades#55962L, songs#55964L, avr_songs_per_sessions#55966, total_length_streamed#55968, artists#55970L, sessions#55972L, days_using_service#55974, user_agents#55976L, locations#55978L, UDF(named_struct(female_double_VectorAssembler_36545c496291, cast(female#55942 as double), thumb_ups_double_VectorAssembler_36545c496291, cast(thumb_ups#55944L as double), thumb_downs_double_VectorAssembler_36545c496291, cast(thumb_downs#55946L as double), errors_double_VectorAssembler_36545c496291, cast(errors#55948L as double), support_enquiries_double_VectorAssembler_36545c496291, cast(support_enquiries#55950L as double), last_level_paid_double_VectorAssembler_36545c496291, cast(last_level_paid#55954 as double), referrals_double_VectorAssembler_36545c496291, cast(referrals#55956L as double), playlists_double_VectorAssembler_36545c496291, cast(playlists#55958L as double), downgrades_double_VectorAssembler_36545c496291, cast(downgrades#55960L as double), upgrades_double_VectorAssembler_36545c496291, cast(upgrades#55962L as double), songs_double_VectorAssembler_36545c496291, cast(songs#55964L as double), songs_double_VectorAssembler_36545c496291, cast(songs#55964L as double), ... 14 more fields)) AS NumFeatures#132053]\n               +- Sample 0.0, 0.5, false, 123\n                  +- Sort [userId#51839 ASC NULLS FIRST, female#55942 ASC NULLS FIRST, thumb_ups#55944L ASC NULLS FIRST, thumb_downs#55946L ASC NULLS FIRST, errors#55948L ASC NULLS FIRST, support_enquiries#55950L ASC NULLS FIRST, label#127153 ASC NULLS FIRST, last_level_paid#55954 ASC NULLS FIRST, referrals#55956L ASC NULLS FIRST, playlists#55958L ASC NULLS FIRST, downgrades#55960L ASC NULLS FIRST, upgrades#55962L ASC NULLS FIRST, songs#55964L ASC NULLS FIRST, avr_songs_per_sessions#55966 ASC NULLS FIRST, total_length_streamed#55968 ASC NULLS FIRST, artists#55970L ASC NULLS FIRST, sessions#55972L ASC NULLS FIRST, days_using_service#55974 ASC NULLS FIRST, user_agents#55976L ASC NULLS FIRST, locations#55978L ASC NULLS FIRST], false\n                     +- Sample 0.8, 1.0, false, 123\n                        +- Sort [userId#51839 ASC NULLS FIRST, female#55942 ASC NULLS FIRST, thumb_ups#55944L ASC NULLS FIRST, thumb_downs#55946L ASC NULLS FIRST, errors#55948L ASC NULLS FIRST, support_enquiries#55950L ASC NULLS FIRST, label#127153 ASC NULLS FIRST, last_level_paid#55954 ASC NULLS FIRST, referrals#55956L ASC NULLS FIRST, playlists#55958L ASC NULLS FIRST, downgrades#55960L ASC NULLS FIRST, upgrades#55962L ASC NULLS FIRST, songs#55964L ASC NULLS FIRST, avr_songs_per_sessions#55966 ASC NULLS FIRST, total_length_streamed#55968 ASC NULLS FIRST, artists#55970L ASC NULLS FIRST, sessions#55972L ASC NULLS FIRST, days_using_service#55974 ASC NULLS FIRST, user_agents#55976L ASC NULLS FIRST, locations#55978L ASC NULLS FIRST], false\n                           +- Project [userId#51839, female#55942, thumb_ups#55944L, thumb_downs#55946L, errors#55948L, support_enquiries#55950L, churn#55952 AS label#127153, last_level_paid#55954, referrals#55956L, playlists#55958L, downgrades#55960L, upgrades#55962L, songs#55964L, avr_songs_per_sessions#55966, total_length_streamed#55968, artists#55970L, sessions#55972L, days_using_service#55974, user_agents#55976L, locations#55978L]\n                              +- Aggregate [userId#51839], [userId#51839, max(female#53364) AS female#55942, sum(cast(CASE WHEN (page#51832 = Thumbs Up) THEN 1 ELSE 0 END as bigint)) AS thumb_ups#55944L, sum(cast(CASE WHEN (page#51832 = Thumbs Down) THEN 1 ELSE 0 END as bigint)) AS thumb_downs#55946L, sum(cast(CASE WHEN (page#51832 = Error) THEN 1 ELSE 0 END as bigint)) AS errors#55948L, sum(cast(CASE WHEN (page#51832 = Help) THEN 1 ELSE 0 END as bigint)) AS support_enquiries#55950L, max(churn#53231) AS churn#55952, max(last_level_paid#53442) AS last_level_paid#55954, sum(cast(CASE WHEN (page#51832 = Add Firend) THEN 1 ELSE 0 END as bigint)) AS referrals#55956L, sum(cast(CASE WHEN (page#51832 = Add to Playlist) THEN 1 ELSE 0 END as bigint)) AS playlists#55958L, sum(cast(CASE WHEN (page#51832 = Submit Downgrade) THEN 1 ELSE 0 END as bigint)) AS downgrades#55960L, sum(cast(CASE WHEN (page#51832 = Submit Upgrade) THEN 1 ELSE 0 END as bigint)) AS upgrades#55962L, sum(cast(CASE WHEN (page#51832 = NextSong) THEN 1 ELSE 0 END as bigint)) AS songs#55964L, avg(itemInSession#51826L) AS avr_songs_per_sessions#55966, sum(length#51828) AS total_length_streamed#55968, count(distinct Artist#51822) AS artists#55970L, count(distinct sessionId#51834L) AS sessions#55972L, first(days_using_service#53414, false) AS days_using_service#55974, count(distinct userAgent#51838) AS user_agents#55976L, count(distinct location#51830) AS locations#55978L]\n                                 +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, paid_plan#53388, days_using_service#53414, last_level_paid#53442]\n                                    +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, paid_plan#53388, days_using_service#53414, ... 2 more fields]\n                                       +- Window [last(paid_plan#53388, false) windowspecdefinition(userID#51839, timestamp#53110 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), unboundedfollowing$())) AS last_level_paid#53442], [userID#51839], [timestamp#53110 DESC NULLS LAST]\n                                          +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, paid_plan#53388, days_using_service#53414]\n                                             +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, paid_plan#53388, days_using_service#53414]\n                                                +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, paid_plan#53388, _we0#53415, ... 2 more fields]\n                                                   +- Window [first(timestamp#53110, false) windowspecdefinition(userID#51839, timestamp#53110 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#53415, last(timestamp#53110, false) windowspecdefinition(userID#51839, timestamp#53110 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we1#53416], [userID#51839], [timestamp#53110 DESC NULLS LAST]\n                                                      +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, paid_plan#53388]\n                                                         +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, female#53364, <lambda>(level#51829) AS paid_plan#53388]\n                                                            +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, churn#53231, <lambda>(gender#51825) AS female#53364]\n                                                               +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, timestamp#53110, <lambda>(page#51832) AS churn#53231]\n                                                                  +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, registered#53082, cast((cast(ts#51837L as double) / cast(1000 as double)) as timestamp) AS timestamp#53110]\n                                                                     +- Project [artist#51822, auth#51823, firstName#51824, gender#51825, itemInSession#51826L, lastName#51827, length#51828, level#51829, location#51830, method#51831, page#51832, registration#51833L, sessionId#51834L, song#51835, status#51836L, ts#51837L, userAgent#51838, userId#51839, cast((cast(registration#51833L as double) / cast(1000 as double)) as timestamp) AS registered#53082]\n                                                                        +- Filter NOT (userId#51839 = )\n                                                                           +- Filter AtLeastNNulls(n, userId#51839,sessionId#51834L)\n                                                                              +- Relation[artist#51822,auth#51823,firstName#51824,gender#51825,itemInSession#51826L,lastName#51827,length#51828,level#51829,location#51830,method#51831,page#51832,registration#51833L,sessionId#51834L,song#51835,status#51836L,ts#51837L,userAgent#51838,userId#51839] json\n'

### Random Forest Classifier

In [None]:
# Build pipeline 

# Classifier
rfc = RandomForrestClassifier

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.