In [47]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, isnull
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler

from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import time
import matplotlib.pyplot as plt
import seaborn as sns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [4]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [5]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26259199

In [6]:
df.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+----------+---------+--------+------------------+--------+------------------+--------+--------------+--------+--------+--------------------+------------------+--------------------+------------------+--------------------+--------------------+------------------+
|summary|            artist|      auth|firstName|  gender|     itemInSession|lastName|            length|   level|      location|  method|    page|        registration|         sessionId|                song|            status|                  ts|           userAgent|            userId|
+-------+------------------+----------+---------+--------+------------------+--------+------------------+--------+--------------+--------+--------+--------------------+------------------+--------------------+------------------+--------------------+--------------------+------------------+
|  count|          20850272|  26259199| 25480720|25480720|          26259199|25480720|          20850272|26259199|      25480720|2625

In [7]:
df.first()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [8]:
for col in df.columns:
    missing_count = df.filter((isnan(df[col])) | (df[col].isNull()) | (df[col] == "")).count()
    print(col + ": " + str(missing_count))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

artist: 5408927
auth: 0
firstName: 778479
gender: 778479
itemInSession: 0
lastName: 778479
length: 5408927
level: 0
location: 778479
method: 0
page: 0
registration: 778479
sessionId: 0
song: 5408927
status: 0
ts: 0
userAgent: 778479
userId: 0

In [9]:
df.select('auth').describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+
|summary|      auth|
+-------+----------+
|  count|  26259199|
|   mean|      null|
| stddev|      null|
|    min| Cancelled|
|    max|Logged Out|
+-------+----------+

In [10]:
df.filter(df.auth == 'Cancelled').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5003

In [11]:
#Creating churn column, idea for method taken from https://github.com/Minsifye/Sparkify/blob/master/Sparkify.ipynb
churned_users = [(row['userId']) for row in df.filter(df.page=="Cancellation Confirmation").select('userId').dropDuplicates().collect()]

df = df.withColumn("Churn", df.userId.isin(churned_users))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
#Keep relevant columns and transform them to numerical formats
df = df.select(['artist', 'gender', 'length', 'level', 'userId', 'song', 'page', 'Churn'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df.columns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['artist', 'gender', 'length', 'level', 'userId', 'song', 'page', 'Churn']

In [14]:
def transform(df):
    #Gender column
    df = df.replace(['M', 'F'], ['0', '1'], 'gender')
    df = df.withColumn('gender', df["gender"].cast(IntegerType()))
    
    #Level column
    df = df.replace(['free', 'paid'], ['0', '1'], 'level')
    df = df.withColumn('level', df['level'].cast(IntegerType()))

    #Churn column
    df = df.withColumn('Churn', df['Churn'].cast(IntegerType()))
    
    return df

df = transform(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- length: double (nullable = true)
 |-- level: integer (nullable = true)
 |-- userId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- page: string (nullable = true)
 |-- Churn: integer (nullable = true)

In [16]:
#Taking features that are ready into a seperate dataframe
X = df.groupBy(["userId", "Churn", "gender", "level"]).count()
X.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+------+-----+-----+
| userId|Churn|gender|level|count|
+-------+-----+------+-----+-----+
|1113656|    0|     1|    0| 1100|
|1090937|    1|     0|    0|   53|
|1199569|    0|     0|    1| 2343|
|1874380|    0|     1|    0|   39|
|1802358|    0|     0|    1| 1670|
|1210377|    0|     0|    0|  148|
|1358803|    0|     1|    0|  179|
|1671580|    0|     0|    0|  266|
|1342050|    0|     0|    0|  603|
|1097039|    0|     0|    0|  519|
|1225467|    0|     1|    0|  473|
|1926144|    1|     1|    0|  121|
|1929162|    1|     1|    0|  235|
|1099620|    1|     1|    0|  403|
|1382223|    1|     1|    1|  209|
|1064593|    1|     0|    1| 3430|
|1932534|    0|     1|    0|   78|
|1867226|    0|     0|    1| 1288|
|1833354|    1|     1|    1|  462|
|1560429|    0|     0|    0|  371|
+-------+-----+------+-----+-----+
only showing top 20 rows

In [17]:
#Turn each action in the page column into a column, idea taken from https://github.com/Minsifye/Sparkify/blob/master/Sparkify.ipynb
action_list = [row['page'] for row in df.select('page').dropDuplicates().collect()]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#Remove actions which will not be helpful
actions_remove = ['Cancel', 'Home', 'Save Settings', 'Cancellation Confirmation', 'About', 'Settings', 'Submit Registration', 'Register']
for action in actions_remove:
    action_list.remove(action)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
action_list

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Downgrade', 'Error', 'Add to Playlist', 'Help', 'Submit Upgrade', 'Submit Downgrade', 'Thumbs Down', 'Roll Advert', 'Upgrade', 'Login', 'NextSong', 'Thumbs Up', 'Logout', 'Add Friend']

In [20]:
#Turning each action into a seperate column with counts
for action in action_list:
    df_temp = df.filter(df.page == action).groupBy(df.userId).count()
    df_temp = df_temp.withColumnRenamed('count', action)
    X = X.join(df_temp, 'userId', how='left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
X = X.na.fill(0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
#Remove columns which will not be fed into the model
columns_to_drop = ['userId', 'count']
X = X.drop(*columns_to_drop)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
print("Total number of rows: " + str(X.count()))
print("Total number of churned rows: " + str(X.filter(X.Churn == 1).count()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total number of rows: 34978
Total number of churned rows: 8038

In [56]:
#Turning all feature columns into one column
assembler = VectorAssembler(inputCols = X.columns[1:], outputCol = "features_vector")
data = assembler.transform(X).select('features_vector', 'Churn')

#Standardize the values in the feature column
std_scaler = StandardScaler(inputCol="features_vector", outputCol="features", withStd=True)
scalerModel = std_scaler.fit(data)
final_data = scalerModel.transform(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
train_data, test_data = final_data.randomSplit([.8, .2])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
gbt = GBTClassifier(featuresCol = 'features', labelCol = 'Churn')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
start_time = time.time()
gbt_model = gbt.fit(train_data)
print((time.time() - start_time)/60.0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-67:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 6382



7.937770239512125

In [68]:
gbt_predictions = gbt_model.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
accuracy = MulticlassClassificationEvaluator(labelCol = 'Churn', predictionCol = "prediction", metricName = "accuracy")
f1 = MulticlassClassificationEvaluator(labelCol = 'Churn', predictionCol = "prediction", metricName = 'f1')
recall = MulticlassClassificationEvaluator(labelCol = 'Churn', predictionCol = "prediction", metricName = 'weightedRecall')
auc = BinaryClassificationEvaluator(labelCol = 'Churn', metricName = 'areaUnderROC')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
gbt_accuracy = accuracy.evaluate(gbt_predictions)

gbt_f1 = f1.evaluate(gbt_predictions)

gbt_recall = recall.evaluate(gbt_predictions)

gbt_auc = auc.evaluate(gbt_predictions)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-70:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 9068



In [71]:
print("Accuracy: " + str(gbt_accuracy))
print("F1: " + str(gbt_f1))
print("Recall: " + str(gbt_recall))
print("Area Under Curve: " + str(gbt_auc))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy: 0.7795431976166832
F1: 0.7160634929192435
Recall: 0.7795431976166832
Area Under Curve: 0.7083980293303682

In [72]:
for feature, importance in zip(features, importances):
    print(feature + ": " + str(importance))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Churn: 0.9999999999999961
gender: 8.762679282155424e-16
level: 3.3492692255940937e-16
count: 1.1932627409213092e-15
Downgrade: 2.7418504785719615e-17
Error: 2.6313993226983034e-16
Add to Playlist: 5.156606124272013e-17
Help: 1.4896318724038362e-17
Submit Upgrade: 7.478787199443776e-17
Submit Downgrade: 1.8176968925292573e-17
Thumbs Down: 5.148002188043056e-18
Roll Advert: 6.301199799301066e-19
Upgrade: 2.2175358846265635e-16
Login: 0.0
NextSong: 2.754958758100056e-16
Thumbs Up: 5.541344615239192e-16
Logout: 7.605286223947495e-17
Add Friend: 1.2341578468427498e-16