# Sparkify Project Workspace AWS - Full Dataset (12GB)

## Overview

**Sparkify is a popular (not real!) music service** similar to Spotify or Pandora with a **subscription-based business model**. Each user can listen to their favorite music every day either through the **free-tier** that inserts advertisements between songs or by using a **subscription plan where you pay a fixed monthly fee**. Users can **upgrade, downgrade, or cancel the service at any time**, so it's critical to be sure users love the service. 

**Every time a user interacts with the service it generates (synthetic) data**. Each event (e.g., song played, logout, like, downgrade, ...) is recorded with the corresponding timestamp. All of this information holds the key to keeping users happy and business thriving.

Our **goal** is to answer the following question:<br>

**Which users are at risk of churn, i.e. downgrade from premium service to free-tier plan or cancellation of service altogether?**

By **identifying** these **users before they abandon the service**, we can **proactively engage with them** by offering some discounts and/or incentives, **saving a lot of money** from a business perspective.


In [None]:
print("Welcome to my EMR Notebook!")

## Installing Missing Libraries 

In [None]:
# sc.install_pypi_package("Cython")
sc.install_pypi_package("pandas==1.0.5")
sc.install_pypi_package("matplotlib==3.2.2", "https://pypi.org/simple")
sc.install_pypi_package("seaborn")

In [None]:
sc.list_packages()

## Import Libraries

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StringType, DateType
from pyspark.sql import Window
from pyspark.sql import functions as F

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier, NaiveBayes, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.ml.feature import StringIndexer, VectorAssembler, Normalizer, StandardScaler, MinMaxScaler, IndexToString
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import *

import seaborn as sns


## Create a Spark session

In [None]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

## Import data from S3 Repository

In [None]:
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data) 

## Explore and Prepare Data

In [None]:
df.printSchema()

In [None]:
# check dataframe shape
print(f'The dataset at hand contains {df.count()} events and {len(df.columns)} features recorded.')

In [None]:
df.head()

In [None]:
num_unique_users = df.select(F.countDistinct('userId')).show()

print(f'There are {num_unique_users} unique users.')

In [None]:
# check for missing values

def count_missing(df, col):
    """
    Function that counts missing values (nan, null, empty) in a column of the dataset.
    """
    return df.filter((isnan(df[col])) | (df[col].isNull()) | (df[col] == "")).count()

In [None]:
for col in df.columns:
    missing_count = count_missing(df, col)
    if missing_count > 0:
        print(f"{col}: {missing_count}")

In [None]:
def clean_data(df):
    '''
    Function that performs data cleaning of Sparkify dataset.
    
    INPUT: 
    df - pyspark dataframe containing Sparkify events
    
    OUTPUT:
    df_new - pyspark dataframe with removed rows with empty 'userId' column and duplicates if any
    '''
    
    # remove rows where userId is empty and duplicated rows
    df_clean = df.filter(df["userId"] != "").dropDuplicates()

    return df_clean


def get_date_from_timestamp(df, col_name, new_col_name):
    '''
    Function that convert timestamp to date
    '''
    
    return df.withColumn(new_col_name, F.to_timestamp(F.col(col_name) / 1000).astype(StringType()))


def prepare_dataset(df):
    '''
    Function for preparation of dataset for machine learning models.
    
    INPUT:
    df - initial dataset loaded from json file
    
    OUTPUT:
    df_ml - new dataset prepared for machine learning which
    contains the following columns:
    
    1. userId - initial id of the user
    2. gender - user's gender
    3. days_registered - days since user's registration
    4. avg_events_per_day - average number of events per day for the user
    5. avg_session_lenght - average lenght of session per user
    6. thumbs_up - number of thumbs up events
    7. thumbs_down - number of thumbs down events
    8. addfriends - number of add friends events
    9. addplaylist - number of add to playlist events
    '''
    
    # Clean dataset using clean_data function
    df = clean_data(df)
    
    # Convert event timestamp
    df = get_date_from_timestamp(df, 'ts', 'ts_date')
    
    # Convert registration timestamp
    df = get_date_from_timestamp(df, 'registration', 'registration_date')
    
    # Create column Churn when the event recorded is 'Cancellation Confirmation'. 
    # A value equal to 1 indicates that the user cancelled the subscription.
    churn_cancellation = udf(lambda x: 1 if x=="Cancellation Confirmation" else 0, IntegerType())
    
    df = df.withColumn("churn_cancellation", churn_cancellation("page"))
    
    # Get userId with churn_cancellation == 1
    cancelled_users = df.select(['userId']).where(df.churn_cancellation == 1).groupby('userId').count().toPandas()['userId'].values
    cancelled_users = list(cancelled_users)
    df = df.withColumn('churn_cancellation', when((df.userId).isin(cancelled_users), 1).otherwise(0))
    
    # Convert column gender to numeric: 1 for 'female' and 0 for 'male'
    gender = udf(lambda x: 1 if x=="F" else 0, IntegerType())
    
    df = df.withColumn("gender", gender("gender"))
    
    # Convert column level to numeric: 1 for 'paid' and 0 for 'free'
    level = udf(lambda x: 1 if x=="paid" else 0, IntegerType())
    
    #df = df.withColumn("level", level("level"))
    
    levels = df.select(['userId', 'level', 'ts_date'])\
                        .orderBy(desc('ts_date'))\
                        .dropDuplicates(['userId'])\
                        .select(['userId', 'level'])\
                        .withColumn('last_level', level('level').cast(IntegerType()))
    levels = levels.drop('level')
    levels = levels.withColumnRenamed('userId', 'level_userId')

    
    # Compute active days as number of days since registration
    # Create dataframe with last timestamp (ts_date) per user
    df_last_ts = df.groupBy('userId').agg(max('ts_date').alias('last_interaction'))
    
    # Join dataframes and add column 'days_registered' with number of days from registration date to ts for last event
    df = df.join(df_last_ts, on='userId').select(df_last_ts['*'], df['*']).withColumn('days_registered', datediff(df_last_ts['last_interaction'], df['registration_date']).cast('float'))
    
    # Create a new column 'date' with format 'yyyy-MM-dd'
    df = df.withColumn('date', date_format('ts_date', 'yyyy-MM-dd').alias('date'))
    
    # Create a new column 'last_state' where the last session was recorded
    df = df.withColumn("state", substring(df.location, -2, 2))
    df = df.withColumn('last_state', when(df_last_ts.last_interaction == df.ts_date, df.state))
        
    # Compute average songs played by day per user
    w = Window.partitionBy('userId', 'date')
    songs = df.where(df.page == 'NextSong').select('userId', 'date', count('userId').over(w).alias('songs')).distinct()
    w = Window.partitionBy('userId')
    songs = songs.withColumn('avg_songs_per_day', avg('songs').over(w))
    songs = songs.select(songs['userId'].alias('songs_userId'), 'avg_songs_per_day')
    songs = songs.withColumn('avg_songs_per_day', F.round(songs['avg_songs_per_day'],2)).distinct()
    
    # Compute the number of thumbs up for user
    w = Window.partitionBy('userId')
    thumbs_up = df.where(df.page == 'Thumbs Up').select('userId', count('userId').over(w).alias('thumbs_up')).distinct()
    thumbs_up = thumbs_up.select(thumbs_up['userId'].alias('thumbsup_userId'), 'thumbs_up')
 
    # Compute the number of thumbs down per user
    w = Window.partitionBy('userId')
    thumbs_down = df.where(df.page == 'Thumbs Down').select('userId', count('userId').over(w).alias('thumbs_down')).distinct()
    thumbs_down = thumbs_down.select(thumbs_down['userId'].alias('thumbsdown_userId'), 'thumbs_down')
      
    # Compute the number of add friend events per user
    w = Window.partitionBy('userId')
    num_add_friend = df.where(df.page == 'Add Friend').select('userId', count('userId').over(w).alias('num_add_friend')).distinct()
    num_add_friend = num_add_friend.select(num_add_friend['userId'].alias('friends_userId'), 'num_add_friend')
    
    # Compute the fraction of page 'roll advert'
    w = Window.partitionBy('userId', 'date')
    roll_adv = df.where(df.page == 'Roll Advert').select('userId', 'date', count('userId').over(w).alias('roll_adv')).distinct()
    w = Window.partitionBy('userId')
    roll_adv = roll_adv.withColumn('avg_roll_adv_per_day', avg('roll_adv').over(w))
    roll_adv = roll_adv.select(roll_adv['userId'].alias('rolladv_userId'), 'avg_roll_adv_per_day')
    roll_adv = roll_adv.withColumn('avg_roll_adv_per_day', F.round(roll_adv['avg_roll_adv_per_day'],2)).distinct()
    
    # Construct the final dataset
    df_ml = df.select('userId', 'gender', 'churn_cancellation', 'days_registered', 'last_state').dropna().drop_duplicates()
    df_ml = df_ml.join(songs, df_ml.userId == songs.songs_userId, how='left').distinct() 
    df_ml = df_ml.join(levels, df_ml.userId == levels.level_userId, how='left').distinct()
    df_ml = df_ml.join(thumbs_up, df_ml.userId == thumbs_up.thumbsup_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['thumbs_up'])
    df_ml = df_ml.join(thumbs_down, df_ml.userId == thumbs_down.thumbsdown_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['thumbs_down'])
    df_ml = df_ml.withColumn('Thumbsup_proportion', F.round(df_ml.thumbs_up / df_ml.thumbs_down, 2))
    df_ml = df_ml.fillna(0, subset=['Thumbsup_proportion'])
    df_ml = df_ml.join(num_add_friend, df_ml.userId == num_add_friend.friends_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['num_add_friend'])
    df_ml = df_ml.join(roll_adv, df_ml.userId == roll_adv.rolladv_userId, how='left').distinct()
    df_ml = df_ml.fillna(0, subset=['avg_roll_adv_per_day'])
    df_ml = df_ml.drop('thumbs_up', 'thumbs_down', 'songs_userId', 'level_userID', 'rolladv_userId', 'thumbsup_userId', 'thumbsdown_userId', 'friends_userId')

    return df_ml


In [None]:
df_ml = prepare_dataset(df)    

In [None]:
df_ml_pandas = df_ml.toPandas()

In [None]:
df_ml_pandas.shape[0]

In [None]:
df_ml_pandas.head()

In [None]:
df_ml_pandas.churn_cancellation.value_counts()

In [None]:
# Check correlation with the target variable 'churn_cancellation'
df_ml_pandas.corr()['churn_cancellation'].abs().sort_values(ascending = False)

In [None]:
# Check correlation among features

# creating mask
mask = np.triu(np.ones_like(df_ml_pandas.corr()))
 
# plotting a triangle correlation heatmap
dataplot = sns.heatmap(df_ml_pandas.corr(), cmap="YlGnBu", annot=True, mask=mask)

# displaying heatmap
plt.show()

## Modeling

In [None]:
# Model Evaluator User Defined Functions
def udfModelEvaluator(dfPredictions, labelColumn='label'):

    colSelect = dfPredictions.select(
      [F.col('prediction').cast(DoubleType())
       ,F.col(labelColumn).cast(DoubleType()).alias('label')])

    metrics = MulticlassMetrics(colSelect.rdd)

    mMatrix = metrics.confusionMatrix().toArray().astype(int)    

    mTP = metrics.confusionMatrix().toArray()[1][1]
    mTN = metrics.confusionMatrix().toArray()[0][0]
    mFP = metrics.confusionMatrix().toArray()[0][1]
    mFN = metrics.confusionMatrix().toArray()[1][0]
    
    mAccuracy = metrics.accuracy
    mPrecision = mTP / (mTP + mFP)
    mRecall = mTP / (mTP + mFN)
    mF1 = metrics.fMeasure(1.0)

    mResults = [mAccuracy, mPrecision, mRecall, mF1, mMatrix, mTP, mTN, mFP, mFN, "Return [[0]=Accuracy, [1]=Precision, [2]=Recall, [3]=F1, [4]=ConfusionMatrix, [5]=TP, [6]=TN, [7]=FP, [8]=FN]"]

    return mResults

### Split into Train, Test and Validation sets

In [None]:
# split into train and test sets (80% - 20%)

df_ml = df_ml.withColumnRenamed("churn_cancellation", "label")

train, test = df_ml.randomSplit([0.8, 0.2], seed = 42)


In [None]:
counts_train = train.groupBy('label').count().toPandas()
print(counts_train)

In [None]:
# Counts
count_churned = counts_train[counts_train['label']==1]['count'].values[0]
count_total = counts_train['count'].sum()

# Weights
c = 2
weight_churned = count_total / (c * count_churned)
weight_no_churned = count_total / (c * (count_total - count_churned))

In [None]:
train = train.withColumn("weight", when(train.label ==1, weight_churned).otherwise(weight_no_churned))
train.select('label', 'weight').where(train.label ==1).show(3)

### Machine Learning Pipelines

In [1]:
# Index and encode categorical feature 'last_state'
stringIndexerState = StringIndexer(inputCol="last_state", 
                                   outputCol="stateIndex")

encoder = OneHotEncoder(inputCol="stateIndex",
                        outputCol="stateVec")

# Create a vector for features to be used in the models
features = ['stateVec', 'gender', 'days_registered', 'avg_songs_per_day', 'last_level', 'Thumbsup_proportion', 'num_add_friend', 'avg_roll_adv_per_day']

# Merge multiple columns into a vector column
assemblers = VectorAssembler(inputCols=features, outputCol="rawFeatures", handleInvalid = "keep")

# Scale features
scalers = MinMaxScaler(inputCol="rawFeatures", outputCol='features')

NameError: name 'StringIndexer' is not defined

### Logistic Regression Classifier

In [None]:
# Initialize Logistic Regression Classifier
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Assemble pipeline
pipeline_lr = Pipeline(stages=[stringIndexerState, encoder, assemblers, scalers, lr])

#### Cross Validation and Parameter Tuning

In [None]:
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 30, 50]) \
    .addGrid(lr.regParam, [0.1, 0.3]) \
    .addGrid(lr.elasticNetParam, [0.8])\
    .addGrid(lr.family, ['auto'])\
    .build()

In [None]:
crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [None]:
cvModel_lr = crossval_lr.fit(train)

In [None]:
bestModel_lr = cvModel_lr.bestModel

In [None]:
pred_train_best_lr = bestModel_lr.transform(train)
pred_test_best_lr = bestModel_lr.transform(test)

In [None]:
predictionAndLabels = pred_train_best_lr.rdd.map(lambda lp: (float(lp.prediction), float(lp.label))).toDF().withColumnRenamed('_1', 'prediction').withColumnRenamed('_2', 'label')
metricsList_train_best_lr = udfModelEvaluator(predictionAndLabels, "label")

print('Metrics Train set:')
print(' ')
print(f'Accuracy: {metricsList_train_best_lr[0]}')
print(f'Precision: {metricsList_train_best_lr[1]}')
print(f'Recall: {metricsList_train_best_lr[2]}')
print(f'F1-score: {metricsList_train_best_lr[3]}')
print(' ')

In [None]:
plt.clf()

ax= plt.subplot()
sns.heatmap(metricsList_train_best_lr[4], annot=True, fmt='g', cmap='Blues', ax=ax)
ax.xaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.yaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.show()

%matplot plt

In [None]:
predictionAndLabels = pred_test_best_lr.rdd.map(lambda lp: (float(lp.prediction), float(lp.label))).toDF().withColumnRenamed('_1', 'prediction').withColumnRenamed('_2', 'label')
metricsList_test_best_lr = udfModelEvaluator(predictionAndLabels, "label")

print('Metrics Test set:')
print(' ')
print(f'Accuracy: {metricsList_test_best_lr[0]}')
print(f'Precision: {metricsList_test_best_lr[1]}')
print(f'Recall: {metricsList_test_best_lr[2]}')
print(f'F1-score: {metricsList_test_best_lr[3]}')
print(' ')

In [None]:
plt.clf()

ax= plt.subplot()
sns.heatmap(metricsList_test_best_lr[4], annot=True, fmt='g', cmap='Blues', ax=ax)
ax.xaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.yaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.show()

%matplot plt

In [None]:
# Get the results of cross validation

# get parameters
params = [{p.name: v for p, v in m.items()} for m in cvModel_lr.getEstimatorParamMaps()]

# Convert validation results to pandas dataframe
validation_results_lr = pd.DataFrame.from_dict([
    {cvModel_lr.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, cvModel_lr.avgMetrics)
])

In [None]:
validation_results_lr

### Random Forest Classifier

In [None]:
# Initialize Random Forest Classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Assemble pipeline
pipeline_rf = Pipeline(stages=[stringIndexerState, encoder, assemblers, scalers, rf])

#### Cross Validation and Parameters Tuning

In [None]:
# Create ParamGrid for Cross Validation
paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [3, 4, 5]) \
    .addGrid(rf.impurity, ['entropy', 'gini'])\
    .build()

In [None]:
crossval_rf = CrossValidator(estimator=pipeline_rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [None]:
cvModel_rf = crossval_rf.fit(train)

In [None]:
bestModel_rf = cvModel_rf.bestModel

In [None]:
pred_train_best_rf = bestModel_rf.transform(train)
pred_test_best_rf = bestModel_rf.transform(test)

In [None]:
predictionAndLabels = pred_train_best_rf.rdd.map(lambda lp: (float(lp.prediction), float(lp.label))).toDF().withColumnRenamed('_1', 'prediction').withColumnRenamed('_2', 'label')
metricsList_train_best_rf = udfModelEvaluator(predictionAndLabels, "label")

print('Metrics Train set:')
print(' ')
print(f'Accuracy: {metricsList_train_best_rf[0]}')
print(f'Precision: {metricsList_train_best_rf[1]}')
print(f'Recall: {metricsList_train_best_rf[2]}')
print(f'F1-score: {metricsList_train_best_rf[3]}')
print(' ')


ax= plt.subplot()
sns.heatmap(metricsList_train_best_rf[4], annot=True, fmt='g', cmap='Blues', ax=ax)
ax.xaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.yaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.show()


In [None]:
predictionAndLabels = pred_test_best_rf.rdd.map(lambda lp: (float(lp.prediction), float(lp.label))).toDF().withColumnRenamed('_1', 'prediction').withColumnRenamed('_2', 'label')
metricsList_test_best_rf = udfModelEvaluator(predictionAndLabels, "label")

print('Metrics Test set:')
print(' ')
print(f'Accuracy: {metricsList_test_best_rf[0]}')
print(f'Precision: {metricsList_test_best_rf[1]}')
print(f'Recall: {metricsList_test_best_rf[2]}')
print(f'F1-score: {metricsList_test_best_rf[3]}')
print(' ')


ax= plt.subplot()
sns.heatmap(metricsList_test_best_rf[4], annot=True, fmt='g', cmap='Blues', ax=ax)
ax.xaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.yaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.show()

In [None]:
bestModel = bestModel_rf.stages[4]
importances = bestModel.featureImportances.toArray()
x_values = list(range(len(importances)))

# Retrieve features' labels
features_sel = ['gender', 'days_registered', 'avg_songs_per_day', 'last_level', 'Thumbsup_proportion', 'num_add_friend', 'avg_roll_adv_per_day']
features_states = bestModel_rf.stages[0].labels
features_labels = features_states + features_sel
feature_coef_df = pd.DataFrame(list(zip(features_labels, importances)), columns=['Feature', 'Importance'])\
    .sort_values('Importance', ascending=False)

# Plot the feature importance of the best model
plt.figure(figsize=(10, 8), dpi=80)

sns.barplot(x='Importance', y='Feature', data=feature_coef_df, orient = 'h')
plt.xlabel('Importance')
plt.ylabel('Feature')
plt.title('Feature Importance')

In [None]:
# Check the parameters of the best model
bestModel = bestModel_rf.stages[4]
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))
print('impurity - ', bestModel.getOrDefault('impurity'))

In [None]:
# Get the results of cross validation

# get parameters
params = [{p.name: v for p, v in m.items()} for m in cvModel_rf.getEstimatorParamMaps()]

# Convert validation results to pandas dataframe
validation_results_rf = pd.DataFrame.from_dict([
    {cvModel_rf.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, cvModel_rf.avgMetrics)
])

In [None]:
validation_results_rf

### Gradient-Boosted Tree Classifier

In [None]:
# Initialize Gradient-Boosting Tree Classifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)

# Assemble pipeline
pipeline_gbt = Pipeline(stages = [stringIndexerState, encoder, assemblers, scalers, gbt])

#### Cross Validation and Parameter Tuning

In [None]:
paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20, 40]) \
    .addGrid(gbt.maxDepth, [2, 3, 5]) \
    .build()

In [None]:
crossval_gbt = CrossValidator(estimator=pipeline_gbt,
                          estimatorParamMaps=paramGrid_gbt,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

In [None]:
cvModel_gbt = crossval_gbt.fit(train)

In [None]:
bestModel_gbt = cvModel_gbt.bestModel

In [None]:
pred_train_best_gbt = bestModel_gbt.transform(train)
pred_test_best_gbt = bestModel_gbt.transform(test)

In [None]:
predictionAndLabels = pred_train_best_gbt.rdd.map(lambda lp: (float(lp.prediction), float(lp.label))).toDF().withColumnRenamed('_1', 'prediction').withColumnRenamed('_2', 'label')
metricsList_train_best_gbt = udfModelEvaluator(predictionAndLabels, "label")
metricsList_train_best_gbt

print('Metrics Train set:')
print(' ')
print(f'Accuracy: {metricsList_train_best_gbt[0]}')
print(f'Precision: {metricsList_train_best_gbt[1]}')
print(f'Recall: {metricsList_train_best_gbt[2]}')
print(f'F1-score: {metricsList_train_best_gbt[3]}')
print(' ')


ax= plt.subplot()
sns.heatmap(metricsList_train_best_gbt[4], annot=True, fmt='g', cmap='Blues', ax=ax)
ax.xaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.yaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.show()

In [None]:
predictionAndLabels = pred_test_best_gbt.rdd.map(lambda lp: (float(lp.prediction), float(lp.label))).toDF().withColumnRenamed('_1', 'prediction').withColumnRenamed('_2', 'label')
metricsList_test_best_gbt = udfModelEvaluator(predictionAndLabels, "label")
metricsList_test_best_gbt

print('Metrics Test set:')
print(' ')
print(f'Accuracy: {metricsList_test_best_gbt[0]}')
print(f'Precision: {metricsList_test_best_gbt[1]}')
print(f'Recall: {metricsList_test_best_gbt[2]}')
print(f'F1-score: {metricsList_test_best_gbt[3]}')
print(' ')


ax= plt.subplot()
sns.heatmap(metricsList_test_best_gbt[4], annot=True, fmt='g', cmap='Blues', ax=ax)
ax.xaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.yaxis.set_ticklabels(['Not Churned', 'Churned'])
ax.set_xlabel('Predicted')
ax.set_ylabel('Actual')
ax.set_title('Confusion Matrix')
plt.show()

In [None]:
bestModel = bestModel_gbt.stages[4]
importances = bestModel.featureImportances.toArray()
x_values = list(range(len(importances)))

# Retrieve features'| labels
features_sel = ['gender', 'days_registered', 'avg_songs_per_day', 'last_level', 'Thumbsup_proportion', 'num_add_friend', 'avg_roll_adv_per_day']
features_states = bestModel_gbt.stages[0].labels
features_labels = features_states + features_sel
feature_coef_df = pd.DataFrame(list(zip(features_labels, importances)), columns=['Feature', 'Importance'])\
    .sort_values('Importance', ascending=False)

# Plot the feature importance of the best model
plt.figure(figsize=(10, 8), dpi=80)

sns.barplot(x='Importance', y='Feature', data=feature_coef_df, orient = 'h')
plt.xlabel('Importance')
plt.ylabel('Feature')
plt.title('Feature Importance')

In [None]:
# Get the results of cross validation

# get parameters
params = [{p.name: v for p, v in m.items()} for m in cvModel_gbt.getEstimatorParamMaps()]

# Convert validation results to pandas dataframe
validation_results_gbt = pd.DataFrame.from_dict([
    {cvModel_gbt.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, cvModel_gbt.avgMetrics)
])

In [None]:
validation_results_gbt

## Models Comparison

In [None]:
# Assemble the results of F1 score for each model in a dataframe
res = [{'Classifier': 'Logistic Regression', 'Train - F1 score': metricsList_train_best_lr[3], 'Test - F1 score': metricsList_test_best_lr[3]},
       {'Classifier': 'Random Forest', 'Train - F1 score': metricsList_train_best_rf[3], 'Test - F1 score': metricsList_test_best_rf[3]},
       {'Classifier': 'Gradient-boosted Tree', 'Train - F1 score': metricsList_train_best_gbt[3], 'Test - F1 score': metricsList_test_best_gbt[3]}]

results = pd.DataFrame(res)

results