# Sparkify Project

## Introduction

Sparkify is an imaginary music streaming application. The idea for this project is to manipulate a dataset containing user's information using Spark and to engineer relevant features and learn how to use Spark MLlib to build machine learning models for predicting churn.

## Problem or Idea Definition

Predicting churn rates is a challenging and common problem that data scientists and analysts regularly encounter in any customer-facing business. Additionally, the ability to efficiently manipulate large datasets with Spark is one of the highest-demand skills in the field of data.

This project allows to develop two essential data analytics skills, namely:
- Load large datasets into Spark and manipulate them using Spark SQL and Spark Dataframes
- Use the machine learning APIs within Spark ML to build and tune models

## Project Details

For this project, I decided to deploy a Spark cluster on the cloud using AWS. A dataset provided by Udacity was used for the analysis.
Main tasks in this project are as follows:
- Gathering data
- Assessing and cleaning data
- Exploratory Data Analysis
- Engineering features.
- Trainning and Tuning ML Classifiers.
- Results analysis

## Data Source

This workspace contains a tiny subset (128MB) of the full dataset available (12GB). 


## System and Libraries

In [None]:
sc.install_pypi_package("matplotlib==3.1.2")
sc.install_pypi_package("pandas==0.25.3")

In [None]:
sc.install_pypi_package("seaborn")

In [79]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, isnull, isnan, when, count
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import re
import datetime
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Capstone project") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
debug = True

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [83]:
#Full Sparkify Dataset: s3n://udacity-dsnd/sparkify/sparkify_event_data.json
#Mini Sparkify Dataset: s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json
user_log = spark.read.json('s3n://udacity-dsnd/sparkify/sparkify_event_data.json')
user_log.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [None]:
# Exploring data structure
if debug:
    user_log.printSchema()

In [84]:
user_log.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26259199

In [85]:
# Remove NaN's
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId", "page", "gender", "status", "level"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [86]:
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
user_log_valid.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

25480720

## Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [None]:
if debug:
    user_log_valid.select("userId").dropDuplicates().sort("userId").show()

In [88]:
# Check all events
user_log_valid.groupBy('Page').count().show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------+--------+
|Page                     |count   |
+-------------------------+--------+
|Cancel                   |5003    |
|Submit Downgrade         |6494    |
|Thumbs Down              |239212  |
|Home                     |933124  |
|Downgrade                |184240  |
|Roll Advert              |385212  |
|Logout                   |296005  |
|Save Settings            |29516   |
|Cancellation Confirmation|5003    |
|About                    |48377   |
|Settings                 |147074  |
|Add to Playlist          |597921  |
|Add Friend               |381664  |
|NextSong                 |20850272|
|Thumbs Up                |1151465 |
|Help                     |129448  |
|Upgrade                  |50507   |
|Error                    |25048   |
|Submit Upgrade           |15135   |
+-------------------------+--------+

In [89]:
#Total number of the unique users
user_log_valid.select("userId").dropDuplicates().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22277

In [None]:
# Create a view, in order to leverage the SQL syntax where applicable
#user_log_valid.createOrReplaceTempView('logs')

In [None]:
# We see how many unique users are present in this dataset
#spark.sql('SELECT COUNT(DISTINCT userId) FROM logs').show()

### Users who cancelled

In [90]:
# define who is churned
churn_users = user_log_valid.filter(user_log_valid.page=="Cancellation Confirmation").select("userId").dropDuplicates()
churn_users_list = [(row['userId']) for row in churn_users.collect()] 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
# define who is downgrade
downgrade_users = user_log_valid.filter(user_log_valid.page=="Submit Downgrade").select("userId").dropDuplicates()
downgrade_users_list = [(row['userId']) for row in downgrade_users.collect()]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [92]:
user_log_with_churn = user_log_valid.withColumn("churn", user_log_valid.userId.isin(churn_users_list))
user_log_with_churn = user_log_with_churn.withColumn("downGrade", user_log_with_churn.userId.isin(downgrade_users_list))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#user_log_with_churn = user_log_with_churn.select([col(c).cast("integer") for c in ["churn", "downGrade"]])

In [None]:
user_log_with_churn.select("userId").filter("churn = false").dropDuplicates().count()

In [93]:
# add time to see the time clear
get_time = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
user_log_with_churn = user_log_with_churn.withColumn("time", get_time(user_log_valid.ts))

get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
user_log_with_churn = user_log_with_churn.withColumn("hour", get_hour(user_log_valid.ts))

get_weekday = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%w"))
user_log_with_churn = user_log_with_churn.withColumn("weekday", get_weekday(user_log_valid.ts))

get_day = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)
user_log_with_churn = user_log_with_churn.withColumn("day", get_day(user_log_valid.ts))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
#add state infor
get_location=udf(lambda x:x[-2:])
user_log_with_churn = user_log_with_churn.withColumn("location_state", get_location(user_log_valid.location))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
if debug:
    user_log_with_churn.select("location_state").distinct().show()

In [95]:
user_log_with_churn.dropDuplicates(["userId", "downGrade"]).groupby(["churn", "downGrade"]).count().sort("churn").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+---------+-----+
|churn|downGrade|count|
+-----+---------+-----+
|false|    false|13314|
|false|     true| 3960|
| true|    false| 3860|
| true|     true| 1143|
+-----+---------+-----+

In [96]:
# Gender distribution between churn user and normal user
user_log_with_churn.dropDuplicates(["userId", "gender"]).groupby(["churn", "gender"]).count().sort("churn").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+-----+
|churn|gender|count|
+-----+------+-----+
|false|     M| 8995|
|false|     F| 8279|
| true|     M| 2656|
| true|     F| 2347|
+-----+------+-----+

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [97]:
#Total songs per user
song_per_user = user_log_with_churn.select("userId","song").groupby("userId").count().withColumnRenamed("count","song_per_user")
song_per_user.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+------------------+
|summary|           userId|     song_per_user|
+-------+-----------------+------------------+
|  count|            22277|             22277|
|   mean|1498793.602415047|1143.8129011985457|
| stddev|288853.9641462556|1321.2139656987092|
|    min|          1000025|                 1|
|    max|          1999996|             13591|
+-------+-----------------+------------------+

In [98]:
# Gender
gender_num = user_log_with_churn.select('userId','gender').dropDuplicates().replace(['M','F'],['0','1'],'gender').select('userId',col('gender').cast('int')).withColumnRenamed('gender', 'gender_num') 
gender_num.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+-------------------+
|summary|            userId|         gender_num|
+-------+------------------+-------------------+
|  count|             22277|              22277|
|   mean| 1498793.602415047|0.47699420927413927|
| stddev|288853.96414625563| 0.4994816640021065|
|    min|           1000025|                  0|
|    max|           1999996|                  1|
+-------+------------------+-------------------+

In [99]:
# Thumbs up
thumbs_up = user_log_with_churn.select('userID','page').where(user_log_with_churn.page == 'Thumbs Up').groupBy('userID').count().withColumnRenamed('count', 'thumbs_up') 
thumbs_up.describe().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+------------------+
|summary|           userID|         thumbs_up|
+-------+-----------------+------------------+
|  count|            21732|             21732|
|   mean|1498998.299006074|52.984769004233385|
| stddev|288869.5478645166| 64.86699983998632|
|    min|          1000025|                 1|
|    max|          1999996|               836|
+-------+-----------------+------------------+

In [100]:
# Thumbs down
thumbs_down = user_log_with_churn.select('userID','page').where(user_log_with_churn.page == 'Thumbs Down').groupBy('userID').count().withColumnRenamed('count', 'thumbs_down')
thumbs_down.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+
|summary|            userID|       thumbs_down|
+-------+------------------+------------------+
|  count|             20031|             20031|
|   mean|1498750.1784733662|11.942089760870651|
| stddev|288815.97958919645|12.752728847840007|
|    min|           1000025|                 1|
|    max|           1999996|               154|
+-------+------------------+------------------+

In [101]:
# Listening time
listening_time = user_log_with_churn.select('userId','length').groupby(['userId']).sum().withColumnRenamed('sum(length)','listening_time')

listening_time.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+------------------+
|summary|           userId|    listening_time|
+-------+-----------------+------------------+
|  count|            22277|             22261|
|   mean|1498793.602415047|232963.16116480654|
| stddev|288853.9641462556|273559.41985437524|
|    min|          1000025|          78.49751|
|    max|          1999996|2807182.3311500004|
+-------+-----------------+------------------+

In [102]:
# Level
level_num = user_log_with_churn.select('userId','level').replace(['free','paid'],['0','1'],'level').dropDuplicates().select('userId',col('level').cast('int')).withColumnRenamed('level', 'level_num') 
level_num.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+-------------------+
|summary|            userId|          level_num|
+-------+------------------+-------------------+
|  count|             34976|              34976|
|   mean|1499035.0724210888|0.46271729185727356|
| stddev| 289249.8461655897|0.49861519019549766|
|    min|           1000025|                  0|
|    max|           1999996|                  1|
+-------+------------------+-------------------+

In [103]:
label = user_log_with_churn.select('userId','churn').replace(['True','False'],['1','0'],'churn').dropDuplicates().select('userId',col('churn').cast('int')).withColumnRenamed('churn', 'label') 
label.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+-------------------+
|summary|           userId|              label|
+-------+-----------------+-------------------+
|  count|            22277|              22277|
|   mean|1498793.602415047|0.22458140683215874|
| stddev|288853.9641462554| 0.4173157271489029|
|    min|          1000025|                  0|
|    max|          1999996|                  1|
+-------+-----------------+-------------------+

In [104]:
level_dg = user_log_with_churn.select('userId','downGrade').replace(['True','False'],['0','1'],'downGrade').dropDuplicates().select('userId',col('downGrade').cast('int')).withColumnRenamed('downGrade', 'dg_num') 
level_dg.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+-------------------+
|summary|           userId|             dg_num|
+-------+-----------------+-------------------+
|  count|            22277|              22277|
|   mean|1498793.602415047|0.22907034160793643|
| stddev|288853.9641462556|0.42024403373393693|
|    min|          1000025|                  0|
|    max|          1999996|                  1|
+-------+-----------------+-------------------+

In [105]:
# Joining data set 
data = song_per_user.join(gender_num,'userID','outer').join(thumbs_up,'userID','outer').join(thumbs_down, 'userID','outer').join(listening_time,'userID','outer').join(level_num,'userID','outer').join(label,'userID','outer').drop('userID').fillna(0)

data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+----------+---------+-----------+------------------+---------+-----+
|song_per_user|gender_num|thumbs_up|thumbs_down|    listening_time|level_num|label|
+-------------+----------+---------+-----------+------------------+---------+-----+
|         1317|         0|       53|         33|259349.89726000006|        1|    1|
|         1317|         0|       53|         33|259349.89726000006|        0|    1|
|         2080|         1|       92|         14|      443147.60184|        1|    0|
|          320|         1|        7|          4| 63271.01815999999|        1|    0|
|         1752|         1|       66|         16|364286.86247000005|        1|    0|
+-------------+----------+---------+-----------+------------------+---------+-----+
only showing top 5 rows

In [106]:
data.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- song_per_user: long (nullable = true)
 |-- gender_num: integer (nullable = true)
 |-- thumbs_up: long (nullable = true)
 |-- thumbs_down: long (nullable = true)
 |-- listening_time: double (nullable = false)
 |-- level_num: integer (nullable = true)
 |-- label: integer (nullable = true)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [107]:
# Vector assembler
cols = ['song_per_user', 'gender_num', 'thumbs_up', 'thumbs_down','level_num','listening_time']
assembler = VectorAssembler(inputCols=cols, outputCol="NumFeatures")
data = assembler.transform(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [108]:
# Standard scaler
scaler = StandardScaler(inputCol="NumFeatures", outputCol="features", withStd=True)
scalerModel = scaler.fit(data)
data = scalerModel.transform(data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
# Splitting data into train and validation set
train, validation = data.randomSplit([0.8, 0.2], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
f_score = MulticlassClassificationEvaluator(metricName='f1')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [111]:
lr_train = lr.fit(train)
lr_test = lr_train.transform(validation)

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')

print('Accuracy:')
print(evaluator.evaluate(lr_test,{evaluator.metricName: 'accuracy'}))
print('F-1 score:')
print(evaluator.evaluate(lr_test, {evaluator.metricName: 'f1'}))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-111:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 15593



Accuracy:
0.765062791025822
F-1 score:
0.6666129161627117

### Random forest

In [110]:
rf = RandomForestClassifier()
f_score=MulticlassClassificationEvaluator(metricName='f1')

rf_train = rf.fit(train)
rf_test = rf_train.transform(validation)


evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Accuracy:')
print(evaluator.evaluate(rf_test,{evaluator.metricName: 'accuracy'}))
print('F-1 score:')
print(evaluator.evaluate(rf_test, {evaluator.metricName: 'f1'}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-110:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 15180



Accuracy:
0.7653449978834486
F-1 score:
0.6636130235024897

### Gradient Boosting

In [109]:
gb = GBTClassifier()
f_score=MulticlassClassificationEvaluator(metricName='f1')

gb_train = gb.fit(train)
gb_test = gb_train.transform(validation)
    

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')

print('Accuracy')
print(evaluator.evaluate(gb_test,{evaluator.metricName: 'accuracy'}))
print('F-1 score')
print(evaluator.evaluate(gb_test, {evaluator.metricName: 'f1'}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-109:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 11986



Accuracy
0.7654861013122619
F-1 score
0.6728373820951602

### Preliminary Results and Refinement

Overall, the results of the baseline specifications look encouraging. Especially gardient boosting scored very well on the F1 metrics.
Not surprisingly logistic regression had the lowest F1 score. This is due to the fact, that (as a linear specification) regression results have more bias but also overfit less than the decision tree based algorithms.

Nevertheless, I will proceed further with gradient boosting algorithm. Its scores on the first run were the best. Despite overfitting I expect it to generally perform better on the entire Sparkify dataset.
In the next stage I will therefore improve on the gradient boosting specification.

I will use grid search to find best hyperparameters. Additionally I splitt the sample of the data evenly 50-50 in order to account for the small number of churned customers in the test sample.

### Gradient Boosting

In [74]:
gb_tuned = GBTClassifier()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [75]:
# Tuning the hyperparameter
paramGrid = ParamGridBuilder()\
       .addGrid(gb_tuned.maxIter,[5,10])\
       .addGrid(gb_tuned.maxDepth,[4,12]) \
       .build()


crossval_gbt = CrossValidator(estimator=gb_tuned,
                           evaluator=f_score,
                           estimatorParamMaps=paramGrid,
                           numFolds=3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [76]:
gb_train = gb_tuned.fit(train)
gb_test = gb_train.transform(validation)
    

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
print('Gradient Boosting Results')
print('Accuracy')
print(evaluator.evaluate(gb_test,{evaluator.metricName: 'accuracy'}))
print('F-1 score')
print(evaluator.evaluate(gb_test, {evaluator.metricName: 'f1'}))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-76:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 8487



Gradient Boosting Results
Accuracy
0.7654861013122619
F-1 score
0.6728373820951602

### Feature Importance

In [77]:
gb_train.featureImportances

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparseVector(7, {0: 0.2124, 1: 0.0348, 2: 0.2882, 3: 0.208, 4: 0.0279, 5: 0.1727, 6: 0.0559})

In [None]:
importances = [0.313, 0.0757, 0.2356, 0.2549, 0.0052, 0.0776, 0.038]
features = ['song_per_user', 'gender_num', 'thumbs_up', 'thumbs_down','level_num','listening_time', 'dg_num']

In [None]:
 
plt.barh(features, importances, align='center', alpha=0.5)
plt.xlabel('Importance Score')
plt.title('Feature Importances')

## Summary

In this project first I introduced three baseline algorithms in order to predict customer churn rates of a Sparkify app. The best performing one was the gardient boosting algorithm with an Accuracy of over 83% and F1 score of 83%.

Second, based on its performance I proceeded with gradient boosting algorithm. Additionally in the second step I applied hyperparameter tuning via grid search. The reason for this strategy is the assumption, that gradient bosting will, despite overfitting, perform best on the wohle dataset.

After the second run of the garadient boosting with hyperparameter tuned, the results are similar to those from the baseline model. The F1 score remained very high at 80%. As a next step we should test gradient boosting on the entire dataset.

Furthermore, I also examined the most important factors for the churn rate as it is displayed in the feature importance section. The most important factors in customer churn seem to be: the number of songs played per user, user engagement as represented by the thumbs up/down feature and overall listening time.