In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql import types as t
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Imputer, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1621253859991_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load and Clean Dataset

In [3]:
# full dataset
event_data = 's3n://udacity-dsnd/sparkify/sparkify_event_data.json'
# mini dataset
# evnent_data = s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df = spark.read.json(event_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [6]:
# the size of the dataset
print((df.count(), len(df.columns)))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(26259199, 18)

### How many nulls?

In [7]:
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------
 artist        | 5408927 
 auth          | 0       
 firstName     | 778479  
 gender        | 778479  
 itemInSession | 0       
 lastName      | 778479  
 length        | 5408927 
 level         | 0       
 location      | 778479  
 method        | 0       
 page          | 0       
 registration  | 778479  
 sessionId     | 0       
 song          | 5408927 
 status        | 0       
 ts            | 0       
 userAgent     | 778479  
 userId        | 0

### Define Churn
Here the `Cancellation Confirmation` page is used to define churn. This occurs for both free and paid users. 

In [8]:
# this is a binary, indicator column to be used to predict churn
df = df.withColumn(
    'churn',
    F.when((F.col('page') =='Cancellation Confirmation'), 1) \
    .otherwise(0)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Explore Data

How many users 'churned' over all? Are cancellations the same between free and paid users? 

In [9]:
# users who have a cancellation value of 1
users_who_quit = df.filter(df.churn==1) \
    .select(df.userId) \
    .dropDuplicates()

# users who have a cancellation value of 0
users_who_stayed = df.filter(df.churn==0) \
    .select(df.userId) \
    .dropDuplicates()

# examine their lengths
users_who_quit.count(), users_who_stayed.count(), ( users_who_quit.count() / (users_who_quit.count() + users_who_stayed.count())) * 100

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(5003, 22278, 18.338770572926215)

In [10]:
# are cancellation rates the same between free and paid users? 
churn_table = df.select(df.level, df.churn, df.userId) \
    .dropDuplicates() \
    .groupBy(F.col('level'),F.col('churn')) \
    .count() \
    .withColumnRenamed('count','user_count')

churn_table.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+----------+
|level|churn|user_count|
+-----+-----+----------+
| paid|    0|     16185|
| free|    0|     18793|
| paid|    1|      3424|
| free|    1|      1579|
+-----+-----+----------+

Twice the number of `paid` users churned versus `free` users

In [11]:
( 3424 / ( 3424 + 16185)) * 100, ( 1579 / ( 1579 + 18793)) * 100

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(17.461369779183027, 7.75083447869625)

# Feature Engineering

The approach is to split the text fields for `userAgent` and `location` which will yield device information and city/state information.

In [12]:
# create browser, os, brand from userAgent
df = df.withColumn('browser', F.split(df['userAgent'], '\(').getItem(0)) \
    .withColumn('temp', F.split(df['userAgent'], '\(').getItem(1)) 

df = df.withColumn('os', F.split(df['temp'], ';').getItem(0)) \
    .withColumn('brand', F.split(df['temp'], ';').getItem(1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# split the 'location' field into 'city' and 'state fields'
df = df.withColumn('city', F.split(df['location'], ',').getItem(0)) \
    .withColumn('state', F.split(df['location'], ',').getItem(1))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Every event ( for every user ) has a timestamp associated with it. Here I group by user and difference the timestamps to show the amount of time ( in seconds ) between each event. 

In [14]:
# create window partition
w = Window.partitionBy("userId").orderBy("sessionId")
# calculate lag
df = df.withColumn("lead", F.lag('ts', 1).over(w)) \
    .withColumn("tsDiff", ( F.col('ts') - F.col('lead')) / 1000)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# show distinct values for each column and display vertically
df.agg(*(F.countDistinct(c).alias(c) for c in df.columns)).show(vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------
 artist        | 38337   
 auth          | 4       
 firstName     | 5467    
 gender        | 2       
 itemInSession | 1429    
 lastName      | 1000    
 length        | 23748   
 level         | 2       
 location      | 886     
 method        | 2       
 page          | 22      
 registration  | 22247   
 sessionId     | 228713  
 song          | 253564  
 status        | 3       
 ts            | 5191762 
 userAgent     | 85      
 userId        | 22278   
 churn         | 2       
 browser       | 2       
 temp          | 58      
 os            | 12      
 brand         | 39      
 city          | 816     
 state         | 100     
 lead          | 5191517 
 tsDiff        | 376477

### Create Pivots for Feature Engineering
Most columns contain many distinct values. Because I am interested in determining whether a _future_ `user` will churn or not, I want to aggregate this information at the `user` level (i.e., one row per user, with a 1 or 0 in the `churn` column ). 

Here I pivot the `page` column using the `tsdiff` as my metric to be aggregated. 

In [16]:
# create churn table
user_churn_df = df.select("userId","churn") \
    .groupBy("userId") \
    .sum("churn") \
    .withColumnRenamed("sum(churn)","churn")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# create page table
user_page_df = df.select("userId","page","tsDiff") \
    .filter(df.page != "Cancellation Confirmation") \
    .filter(df.page != "Cancel") \
    .groupBy("userId") \
    .pivot("page") \
    .avg("tsDiff")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# create level table
user_level = df.select("userId","level","sessionId") \
    .groupBy("userId") \
    .pivot("level") \
    .count()

user_level = user_level.select(F.col("userId"), F.col("free").cast(t.DoubleType()), F.col("paid").cast(t.DoubleType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# create location table
user_location = df.select("userId","city","state","browser","os","brand") \
    .dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# join all the tables together
user_data = user_churn_df.join(user_page_df, ['userId'], "left") \
    .join(user_level, ['userId'], "left") \
    .join(user_location, ['userId'], "left") \
    .drop("userId")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# set all null values in categorical columns to 'missing'
# set all null values in numeric columns to 0
user_data = user_data.fillna({'city':'missing', 'state':'missing', 'browser':'missing', 'os':'missing','brand':'missing'})

user_data = user_data.fillna({
    'churn':0, 'About':0, 'Add Friend':0, 'Add to Playlist':0, 'Downgrade':0, 'Error':0, 
    'Help':0, 'Home':0, 'Login':0, 'Logout':0, 'NextSong':0, 'Register':0, 'Roll Advert':0, 
    'Save Settings': 0, 'Settings':0, 'Submit Downgrade':0, 'Submit Registration':0, 'Submit Upgrade':0, 
    'Thumbs Down':0, 'Thumbs Up':0, 'Upgrade':0, 'free':0, 'paid':0})

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# rename 'churn' to 'label'
user_data = user_data.withColumnRenamed("churn","label")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Modeling

The data is ready to be modeled. I will use logistic regression on my user-level data. `label` is the column which defines whether the user churned or not. There are several columns that correspond to the pivot of the original `page` column (About, Add, etc. ) and use the timestamp-difference as their metric. I have other counts of events ( for 'free' and 'paid' ) as well as for the location column pivot (city, state) and `browser` pivot. 

### Columns used: 

In [23]:
user_data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- label: long (nullable = false)
 |-- About: double (nullable = false)
 |-- Add Friend: double (nullable = false)
 |-- Add to Playlist: double (nullable = false)
 |-- Downgrade: double (nullable = false)
 |-- Error: double (nullable = false)
 |-- Help: double (nullable = false)
 |-- Home: double (nullable = false)
 |-- Login: double (nullable = false)
 |-- Logout: double (nullable = false)
 |-- NextSong: double (nullable = false)
 |-- Register: double (nullable = false)
 |-- Roll Advert: double (nullable = false)
 |-- Save Settings: double (nullable = false)
 |-- Settings: double (nullable = false)
 |-- Submit Downgrade: double (nullable = false)
 |-- Submit Registration: double (nullable = false)
 |-- Submit Upgrade: double (nullable = false)
 |-- Thumbs Down: double (nullable = false)
 |-- Thumbs Up: double (nullable = false)
 |-- Upgrade: double (nullable = false)
 |-- free: double (nullable = false)
 |-- paid: double (nullable = false)
 |-- city: string (nullable = false)
 

In [24]:
# there are 22,278 users
print(user_data.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22278

### Split into training / testing sets

Because each row is a distinct user it is easy to split hte data into training and testing sets. 

In [25]:
training, testing = user_data.randomSplit([0.9, 0.1], seed=100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Pipeline

In [None]:
# encode categories in pipeline
categ_pipeline = Pipeline(stages=[
    StringIndexer(inputCol="city",outputCol="cityIndex", handleInvalid="skip"),
    StringIndexer(inputCol="state",outputCol="stateIndex", handleInvalid="skip"),
    StringIndexer(inputCol="browser",outputCol="osIndex", handleInvalid="skip"),
    StringIndexer(inputCol="os",outputCol="browserIndex", handleInvalid="skip"),
    StringIndexer(inputCol="brand",outputCol="brandIndex", handleInvalid="skip"),
    VectorAssembler(
        inputCols=["cityIndex","stateIndex","osIndex","browserIndex","brandIndex"], 
        outputCol="categ_features"
    )
])

In [26]:
# encode categories in pipeline
categ_pipeline = Pipeline(stages=[
    StringIndexer(inputCols=['city','state','browser','os','brand'],outputCols=['cityIndex','stateIndex','osIndex','browserIndex','brandIndex'], handleInvalid='skip'),
    VectorAssembler(
        inputCols=["cityIndex","stateIndex","osIndex","browserIndex","brandIndex"], 
        outputCol="categ_features"
    )
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# nuermic columns
numeric_cols = user_page_df.columns[1:] + user_level.columns[1:]
# numeric pipeline
numer_pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=numeric_cols, outputCol="numer_features")
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# unite pipelines ( final features have to be named 'features')
# unite pipelines
final_pipeline = Pipeline(stages=[
    categ_pipeline,
    numer_pipeline,
    VectorAssembler(
        inputCols=["categ_features","numer_features"],
        outputCol="features"
    ),
    LogisticRegression(maxIter=10)
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Cross-Validate

In [29]:
# define parameters ( just one )
paramGrid = ParamGridBuilder() \
    .addGrid(LogisticRegression.regParam, [0.1, 0.05, 0.01]) \
    .build()

# optimize 3 fold cross-validation using the area under ROC curve
crossval = CrossValidator(estimator=final_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol='label',metricName='areaUnderROC'),
                          numFolds=2) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
cvModel = crossval.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# certainly looks like the regParam is fine at 0.1
cvModel.avgMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.7751949878121165, 0.775185330060725, 0.7751943482456591]

### Fit on all training data using regParam = 0.1

In [32]:
# unite pipelines ( final features have to be named 'features')
# unite pipelines
final_pipeline2 = Pipeline(stages=[
    categ_pipeline,
    numer_pipeline,
    VectorAssembler(
        inputCols=["categ_features","numer_features"],
        outputCol="features"
    ),
    LogisticRegression(maxIter=10, regParam=0.1)
])

pipelineModel = final_pipeline2.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
summaryPipeline = pipelineModel.stages[-1].summary

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
summaryPipeline.accuracy

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.7762258692073627

In [36]:
summaryPipeline.areaUnderROC

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.7459488742662125

In [37]:
# save roc data
roc_df = pipelineModel.stages[-1].summary.roc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
roc_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+
|                 FPR|                 TPR|
+--------------------+--------------------+
|                 0.0|                 0.0|
|0.003405950774371827|0.007356219349086...|
|0.007840113103270999|0.014712438698172091|
|0.010667694878221194|0.022737405260811413|
|0.013945119208277103| 0.03366027641551494|
|  0.0167727009832273|0.045697726259473916|
|0.020178651757599125| 0.05840392331698618|
| 0.02422723475355054| 0.06665180561747659|
|0.028083028083028084| 0.07601426660722246|
|0.031231925968768073| 0.08604547481052162|
| 0.03418803418803419| 0.09786000891662952|
| 0.03611593085277296| 0.11234953187695051|
| 0.03791530107319581|  0.1268390548372715|
|0.040614356403830085|  0.1388765046812305|
|0.042799305957200696| 0.14957646009808293|
| 0.04536983484351905| 0.16384306732055284|
|0.047169205063941906|  0.1821221578243424|
|  0.0499967868388921|  0.1963887650468123|
| 0.05179615705931495| 0.20976370931787786|
| 0.05417389627915944| 0.2233615

In [42]:
# there definitely seems to be a browser issue
print("coefficients: " + str(pipelineModel.stages[-1].coefficientMatrix))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

coefficients: DenseMatrix([[-8.87449842e-06,  1.04839756e-03,  1.27292957e-03,
              -3.89446486e-03,  1.05170962e-03, -5.64071471e-07,
              -6.69006951e-02, -4.86610949e-06, -1.08559312e-05,
              -1.49422038e-06, -4.94776114e-07, -1.28371334e-06,
              -4.76002682e-04, -1.17992206e-01, -2.13477950e-05,
               4.18354763e-05, -1.10586623e-05, -1.31596707e-01,
               1.21359388e-06, -4.67655746e-02,  1.40929771e+00,
               4.63190821e-02, -8.59059618e-06, -2.05167384e-05,
              -8.39993873e-06,  8.99917601e-06, -2.82237526e-05]])

In [43]:
coeffs = pipelineModel.stages[-1].coefficientMatrix

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
import pandas as pd

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

No module named 'pandas'
Traceback (most recent call last):
ModuleNotFoundError: No module named 'pandas'



In [87]:
odds_ratios = np.around(np.exp(coeffs.toArray()),2).tolist()
feature_list = ['cityIndex','stateIndex','osIndex','browserIndex','brandIndex'] + numeric_cols

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [88]:
odds_ratios_df = spark.createDataFrame(
    [ t for t in zip(odds_ratios[0], feature_list)],
    ["or", "feature"]
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
odds_ratios_df.show(30)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------------+
|  or|            feature|
+----+-------------------+
| 1.0|          cityIndex|
| 1.0|         stateIndex|
| 1.0|            osIndex|
| 1.0|       browserIndex|
| 1.0|         brandIndex|
| 1.0|              About|
|0.94|         Add Friend|
| 1.0|    Add to Playlist|
| 1.0|          Downgrade|
| 1.0|              Error|
| 1.0|               Help|
| 1.0|               Home|
| 1.0|              Login|
|0.89|             Logout|
| 1.0|           NextSong|
| 1.0|           Register|
| 1.0|        Roll Advert|
|0.88|      Save Settings|
| 1.0|           Settings|
|0.95|   Submit Downgrade|
|4.09|Submit Registration|
|1.05|     Submit Upgrade|
| 1.0|        Thumbs Down|
| 1.0|          Thumbs Up|
| 1.0|            Upgrade|
| 1.0|               free|
| 1.0|               paid|
+----+-------------------+