# Predict Customer Churn using PySpark Machine Learning

This notebook is a replicate of this [notebook](https://github.com/scientist94/Udacity_DSND_Spark/blob/master/Sparkify.ipynb)

Predicting customer churn is a challenging and common problem that data scientists encounter these days. The ability to predict that a particular customer is at a high risk of churning, while there is still time to do something about it, represents a huge additional potential revenue source for every customer-facing business.

This notebook covers the creation of a machine learning solution which will be able to predict customer churn. This solution will be realized with Apache Spark. We load large datasets into Spark and manipulate them using Spark SQL and Spark Dataframes to engineer relevant features for predicting customer churn, then use the machine learning APIs within Spark ML to build and tune models.

Dataset is from Music App where user interacts with the service like playing, liking, sharing, songs, or account info, etc.

And Our goal is the predict the churn based on the target varialbe 'label' column.

In [2]:
#we use the findspark library to locate spark on our local machine
import findspark
findspark.init('C:/Users/bokhy/spark/spark-2.4.6-bin-hadoop2.7')

In [3]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time
import os
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 
import pyspark.sql.functions as F

from sklearn.metrics import f1_score, recall_score, precision_score
from pyspark.sql.types import IntegerType, DoubleType, DateType, FloatType
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, GBTClassifier, LinearSVC

### 1. Load Data

In [4]:
# Initiate the spark session
# Max sure to set config the avoid memory limits 
MAX_MEMORY = "10g"

spark = SparkSession \
    .builder \
    .appName("Churn") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

In [5]:
spark

Memory Limit in Spark [Stack Overflow](https://datascience.stackexchange.com/questions/8549/how-do-i-set-get-heap-size-for-spark-via-python-notebook)

In [6]:
# read in dataset
path = 'C:\\Users\\bokhy\\Desktop\\Python\\github\\PySpark\\data'  

df = spark.read.json(os.path.join(path, 'churn_data.json'))

In [7]:
display(df)

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [8]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### 2. EDA

In [9]:
# Get amount of unique users
df.select("userId").dropDuplicates().count()

226

In [10]:
# Show unique page views
df.select('page').distinct().collect()

[Row(page='Cancel'),
 Row(page='Submit Downgrade'),
 Row(page='Thumbs Down'),
 Row(page='Home'),
 Row(page='Downgrade'),
 Row(page='Roll Advert'),
 Row(page='Logout'),
 Row(page='Save Settings'),
 Row(page='Cancellation Confirmation'),
 Row(page='About'),
 Row(page='Submit Registration'),
 Row(page='Settings'),
 Row(page='Login'),
 Row(page='Register'),
 Row(page='Add to Playlist'),
 Row(page='Add Friend'),
 Row(page='NextSong'),
 Row(page='Thumbs Up'),
 Row(page='Help'),
 Row(page='Upgrade'),
 Row(page='Error'),
 Row(page='Submit Upgrade')]

Let's look at which pages are visited the most time.

In [11]:
df.groupBy('page').count().sort(F.desc('count')).show()

+--------------------+------+
|                page| count|
+--------------------+------+
|            NextSong|228108|
|                Home| 14457|
|           Thumbs Up| 12551|
|     Add to Playlist|  6526|
|          Add Friend|  4277|
|         Roll Advert|  3933|
|               Login|  3241|
|              Logout|  3226|
|         Thumbs Down|  2546|
|           Downgrade|  2055|
|                Help|  1726|
|            Settings|  1514|
|               About|   924|
|             Upgrade|   499|
|       Save Settings|   310|
|               Error|   258|
|      Submit Upgrade|   159|
|    Submit Downgrade|    63|
|Cancellation Conf...|    52|
|              Cancel|    52|
+--------------------+------+
only showing top 20 rows



We can clearly see that “NextSong” is the most popular page view which makes perfect sense for a music service. However, there are many other page views which are going to be important for engineering relevant features from this raw dataset. We take the page “Cancellation Confirmation”, counting 99 visits, to create the label for the machine learning models.

In [12]:
flag_cancellation_event = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn('label', flag_cancellation_event('page'))

Let look at by each hour

In [13]:
get_hour = F.udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour, IntegerType())
df = df.withColumn('hour', get_hour(df.ts))

Since matplotlib does not work with PySpark dataframes, we convert it back to a pandas and plot the user activity by hour.

In [None]:
# Count the events per hour
songs_by_hour = df.groupBy('hour').count().orderBy(df.hour)
songs_by_hour_pd = songs_by_hour.toPandas()
songs_by_hour_pd.hour = pd.to_numeric(songs_by_hour_pd.hour)

# Plot the events per hour aggregation
plt.scatter(songs_by_hour_pd['hour'], songs_by_hour_pd['count'])
plt.xlim(-1, 24)
plt.ylim(0, 1.2 * max(songs_by_hour_pd['count']))
plt.xlabel('Hour')
plt.ylabel('Events');

### 3. Data PreProcessing

We create a new PySpark dataframe, where each row representing each user. We will create features from the dataframe df and join those sequentially to the new dataframe.

Based on the column label in df we can separate the churned users from the rest.

In [17]:
# Get churned user sets
churned_collect = df.where(df.label==1).select('userId').collect()
churned_users = set([int(float(row.userId)) for row in churned_collect])
churned_df = df.where(F.col('userId').isin(churned_users))

In [27]:
# Get stayed user sets
all_collect = df.where(df.userId != '').select('userId').collect()
all_users = set([int(float(row.userId)) for row in all_collect])

stayed_users = all_users-churned_users
stayed_df =  df.where(F.col('userId').isin(stayed_users))

In [28]:
# Compute average song count per churned user
avg_songs_churned = churned_df.filter(df.page=='NextSong').count()/len(churned_users)
print(avg_songs_churned)

699.8846153846154


In [29]:
# Compute average song count per stayed user
avg_songs_stayed = stayed_df.filter(df.page=='NextSong').count()/len(stayed_users)
print(avg_songs_stayed)

1108.1734104046243


In [30]:
# Get gender proportions for churned users
churned_df.dropDuplicates(subset = ['userId']).groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|   20|
|     M|   32|
+------+-----+



In [31]:
# Get gender proportions for stayed users
stayed_df.dropDuplicates(subset = ['userId']).groupBy('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|     F|   84|
|     M|   89|
+------+-----+



We will build out the features we find promising to train the model on

In [32]:
# Create features dataframe
feature_df = spark.createDataFrame(all_users, IntegerType()).withColumnRenamed('value', 'userId')

In [33]:
# Create binary gender column
convert_gender = F.udf(lambda x: 1 if x == 'M' else 0, IntegerType())
df = df.withColumn('GenderBinary', convert_gender('Gender'))

# Add gender as feature
feature_df = feature_df.join(df.select(['userId', 'GenderBinary']), 'userId') \
    .dropDuplicates(subset=['userId']) \
    .sort('userId')

In [34]:
# Create binary customer level column
convert_level = F.udf(lambda x: 1 if x == 'free' else 0, IntegerType())
df = df.withColumn('LevelBinary', convert_level('Level'))

# Add customer level as feature
feature_df = feature_df.join(df.select(['userId', 'ts', 'LevelBinary']), 'userId') \
    .sort(F.desc('ts')) \
    .dropDuplicates(subset=['userId']) \
    .drop('ts')

In [35]:
# Create label column
create_churn = F.udf(lambda x: 1 if x in churned_users else 0, IntegerType())
feature_df = feature_df.withColumn('label', create_churn('userId'))

In [39]:
feature_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- GenderBinary: integer (nullable = true)
 |-- LevelBinary: integer (nullable = true)
 |-- label: integer (nullable = true)



Encode Page Views as Features

Every time the users interact with the platform, it generates data. This means that we know exactly what each of the users experienced during the period of this data extract. My approach is to divide the pages into categories:

- Neutral pages: “Cancel”, “Home”, “Logout”, “Save Settings”, “About”, “Settings” \

- Negative pages: “Thumbs Down”, “Roll Advert”, “Help”, “Error” \

- Positive pages: “Add to Playlist”, “Add Friend”, “NextSong”, “Thumbs Up” \

- Downgrade pages: “Submit Downgrade”, “Downgrade” \

- Upgrade pages: “Submit Upgrade”, “Upgrade” \

In [40]:
# Create a dictonary which maps page views and PySpark dataframes 
pages = {}
pages['neutralPages'] = df.filter((df.page == 'Cancel') | (df.page == 'Home') | (df.page == 'Logout') \
    | (df.page == 'Save Settings') | (df.page == 'About') | (df.page == 'Settings'))
pages['negativePages'] = df.filter((df.page == 'Thumbs Down') | (df.page == 'Roll Advert') | (df.page == 'Help') \
    | (df.page == 'Error'))
pages['positivePages'] = df.filter((df.page == 'Add to Playlist') | (df.page == 'Add Friend') | (df.page == 'NextSong') \
    | (df.page == 'Thumbs Up'))
pages['downgradePages'] = df.filter((df.page == 'Submit Downgrade') | (df.page == 'Downgrade'))
pages['upgradePages'] = df.filter((df.page == 'Upgrade') | (df.page == 'Submit Upgrade'))

In [41]:
# Loop through page views and aggregate the counts by user
for key, value in pages.items():
    value_df = value.select('userId') \
        .groupBy('userId') \
        .agg({'userId':'count'}) \
        .withColumnRenamed('count(userId)', key)
    
    # Add page view aggregations as features
    feature_df = feature_df.join(value_df, 'userId', 'left').sort('userId') \
        .fillna({key:'0'})

In [42]:
def delta_time(x, y):
    '''
    This function computes the timedelta in days between two unix timestamps.
    
    INPUT:
    x, y - timestamps
    
    OUTPUT:
    Timedelta in days    
    '''
    val1 = datetime.datetime.fromtimestamp(x/1000.0)
    val2 = datetime.datetime.fromtimestamp(y/1000.0)
    delta = val2-val1
    delta_days = delta.days
    if delta_days == 0:
        return 1
    return delta_days

# Create UDF for delta_time
delta = F.udf(delta_time, IntegerType())

In [43]:
# Create dataframe with users and first timestamp
min_date_df =  df.select('userId', 'ts') \
    .groupby('userId') \
    .agg(F.min('ts'))

# Create dataframe with users and last timestamp
max_date_df = df.select('userId', 'ts') \
    .groupby('userId') \
    .agg(F.max('ts'))

# Create dataframe which contains time in days between first and last timestamp per user
delta_df = min_date_df.join(max_date_df, 'userId')
delta_df = delta_df.withColumn('UserActiveTime', delta('min(ts)', 'max(ts)')).drop('min(ts)', 'max(ts)')

# Add UserActiveTime as feature
feature_df = feature_df.join(delta_df, 'userId', 'left').sort('userId') \
    .fillna({key:'1'})

In [46]:
feature_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- GenderBinary: integer (nullable = true)
 |-- LevelBinary: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- neutralPages: long (nullable = true)
 |-- negativePages: long (nullable = true)
 |-- positivePages: long (nullable = true)
 |-- downgradePages: long (nullable = true)
 |-- upgradePages: long (nullable = true)
 |-- UserActiveTime: integer (nullable = true)



In [45]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- GenderBinary: integer (nullable = true)
 |-- LevelBinary: integer (nullable = true)



In [44]:
# Create dataframe with users and date counts
dateCount_df = df.select('userId', 'date') \
    .groupby('userId') \
    .agg(F.countDistinct('date')) \
    .withColumnRenamed('count(DISTINCT date)', 'dateCount')

# Add date count as feature
feature_df = feature_df.join(dateCount_df, 'userId', 'left').sort('userId') \
        .fillna({'dateCount':'1'})

AnalysisException: "cannot resolve '`date`' given input columns: [itemInSession, label, lastName, auth, LevelBinary, sessionId, firstName, GenderBinary, hour, userId, location, gender, status, level, artist, ts, userAgent, page, length, registration, song, method];;\n'Project [userId#23, 'date]\n+- Project [artist#6, auth#7, firstName#8, gender#9, itemInSession#10L, lastName#11, length#12, level#13, location#14, method#15, page#16, registration#17L, sessionId#18L, song#19, status#20L, ts#21L, userAgent#22, userId#23, label#85, hour#106, GenderBinary#406, <lambda>(Level#13) AS LevelBinary#433]\n   +- Project [artist#6, auth#7, firstName#8, gender#9, itemInSession#10L, lastName#11, length#12, level#13, location#14, method#15, page#16, registration#17L, sessionId#18L, song#19, status#20L, ts#21L, userAgent#22, userId#23, label#85, hour#106, <lambda>(Gender#9) AS GenderBinary#406]\n      +- Project [artist#6, auth#7, firstName#8, gender#9, itemInSession#10L, lastName#11, length#12, level#13, location#14, method#15, page#16, registration#17L, sessionId#18L, song#19, status#20L, ts#21L, userAgent#22, userId#23, label#85, <lambda>(ts#21L) AS hour#106]\n         +- Project [artist#6, auth#7, firstName#8, gender#9, itemInSession#10L, lastName#11, length#12, level#13, location#14, method#15, page#16, registration#17L, sessionId#18L, song#19, status#20L, ts#21L, userAgent#22, userId#23, <lambda>(page#16) AS label#85]\n            +- Relation[artist#6,auth#7,firstName#8,gender#9,itemInSession#10L,lastName#11,length#12,level#13,location#14,method#15,page#16,registration#17L,sessionId#18L,song#19,status#20L,ts#21L,userAgent#22,userId#23] json\n"

These page view features are absolute values counting the number of occurrences. However, this can cause misleading results if some users signed up at the end of the data extract while others used the platform from the very beginning. For this purpose, we will make the aggregated results comparable by dividing them through the user-specific time window, obtaining counts/day. The columns containing the absolute values will be dropped afterwards.

In [None]:
# Normalize the page view features by dividing through UserActiveTime
for i in ['neutralPages', 'negativePages', 'positivePages', 'downgradePages', 'upgradePages', 'dateCount']:
    feature_df = feature_df.withColumn(i+'Normalized', feature_df[i]/feature_df.UserActiveTime).drop(i)

Create Features on User Activity by Hour and by Day

In [None]:
# Create dataframe with users and mean hour
hour_df = df.select('userId', 'hour') \
    .groupby('userId') \
    .agg({'hour': 'mean'}) \
    .withColumnRenamed('avg(hour)', 'hourAvg')

# Add mean hour as feature
feature_df = feature_df.join(hour_df, 'userId')

Encode User Activity over Time as Slope Feature

In [None]:
# Create dataframe with users and their activity per day
activity_df = df.select('userId', 'date') \
    .groupby('userID', 'date') \
    .count()

# Create Spark dataframe with all users
activity_user = spark.createDataFrame(all_users, IntegerType()).withColumnRenamed('value', 'userId')

In [None]:
# initialize slopes
slopes = []
for user in all_users:
    # Create pandas dataframe for slope calculation
    activity_pandas = activity_df.filter(activity_df['userID'] == user).sort(F.asc('date')).toPandas()
    if activity_pandas.shape[0]==1:
        slopes.append(0)
        continue
    # Fit a line through the user activity counts and retrieve its slope
    slope = np.polyfit(activity_pandas.index, activity_pandas['count'], 1)[0]
    slopes.append(slope)

In [None]:
def outliers_iqr(data):
    '''
    This function returns the indices of outliers in data.
    
    INPUT:
    data - list containing values
    
    OUTPUT:
    Outlier indices    
    '''
    avg = np.mean(data)
    lower_bound = avg - 2*np.std(data)
    upper_bound = avg + 2*np.std(data)
    return np.where((data > upper_bound) | (data < lower_bound))

# Set outlier slopes to zero
for i in outliers_iqr(slopes)[0]:
    slopes[i] = 0

In [None]:
# Create dataframe from slopes
slope_df = spark.createDataFrame([float(i) for i in slopes], FloatType()).withColumnRenamed('value', 'Slope')

# Join activity_user and slope_df
activity_user = activity_user.withColumn("row_idx", F.monotonically_increasing_id())
slope_df = slope_df.withColumn("row_idx", F.monotonically_increasing_id())
user_slopes = activity_user.join(slope_df, activity_user.row_idx == slope_df.row_idx).\
             drop("row_idx")

# Add slopes as feature
feature_df = feature_df.join(user_slopes, 'userId').sort('userId')

Feature Scaling, Merge Columns to one Features Vector

In [None]:
# UDF for converting column type from vector to double type
unlist = F.udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterate over columns to be scaled
for i in ['neutralPagesNormalized', 'negativePagesNormalized', 'positivePagesNormalized', \
          'downgradePagesNormalized', 'upgradePagesNormalized', 'dateCountNormalized', \
         'hourAvg', 'UserActiveTime', 'Slope']:
    # VectorAssembler Transformation - Convert column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    feature_df = pipeline.fit(feature_df).transform(feature_df) \
        .withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

# Merge columns to one feature vector
assembler = VectorAssembler(inputCols=['neutralPagesNormalized_Scaled', 'negativePagesNormalized_Scaled', 'positivePagesNormalized_Scaled', \
                                        'downgradePagesNormalized_Scaled', 'upgradePagesNormalized_Scaled', 'dateCountNormalized_Scaled', \
                                        'hourAvg_Scaled', 'UserActiveTime_Scaled', 'Slope_Scaled', 'LevelBinary', \
                                       'GenderBinary'], outputCol='features')
feature_df = assembler.transform(feature_df)

### 4. Create a Model

After the creation of features, we can move on and split the full dataset into training and testing. We will test out several common machine learning methods used for classification tasks. The accuracy of the models will be evaluated and parameters tuned accordingly. Based on the F1-Score, Precision, Recall and ROC Area we will determine the winning model.

Split the dataset

In [None]:
# Read in the features dataframe and split it into training and testing
train, test = feature_df.randomSplit([0.7, 0.3], seed = 42)

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
# Check for imbalance in the label distribution
plt.hist(feature_df.toPandas()['label'])
plt.show()

#### Machine Learning Hyperparameter Tuning and Evaluation

Spark's MLlib supports tools for model selection such as CrossValidator. This requires an estimator, a set of parameters and an evaluator. The estimators and parameters will be set for each classifier specifically. For evaluation, we take the BinaryClassificationEvaluator which supports both the 'areaUnderROC' and the 'areaUnderPR'. Since we have a class imbalance in the data, we take the 'areaUnderPR' as our evaluation metric (cf. http://pages.cs.wisc.edu/~jdavis/davisgoadrichcamera2.pdf).

In terms of evaluation, Spark's MLlib provides metrics to assess the performance of the trained machine learning algorithms.

However, the class BinaryClassificationEvaluator from pyspark.ml.evaluation only provides the metrics 'areaUnderPR' and 'areaUnderROC'. Therefore, we will compute the F1-Score, Precision and Recall with sklearn.

In [None]:
# Create binary evaluator object
evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderPR')

In [None]:
# Calculate a balancing ratio to account for the class imbalance
balancing_ratio = train.filter(train['label']==0).count()/train.count()
train=train.withColumn("classWeights", F.when(train.label == 1,balancing_ratio).otherwise(1-balancing_ratio))

##### 1. Logistic Regression

In [None]:
# Create a logistic regression object
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', weightCol="classWeights")

In [None]:
# Train the logistic regression model without parameter tuning
lrModel = lr.fit(train)
trainingSummary = lrModel.summary

In [None]:
# Plot the threshold-recall curve
tr = trainingSummary.recallByThreshold.toPandas()
plt.plot(tr['threshold'], tr['recall'])
plt.xlabel('Threshold')
plt.ylabel('Recall')
plt.show()

In [None]:
# Plot the threshold-precision curve
tp = trainingSummary.precisionByThreshold.toPandas()
plt.plot(tp['threshold'], tp['precision'])
plt.xlabel('Threshold')
plt.ylabel('Precision')
plt.show()

In [None]:
# Plot the recall-precision curve
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'], pr['precision'])
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.show()

In [None]:
# Plot the threshold-F-Measure curve
fm = trainingSummary.fMeasureByThreshold.toPandas()
plt.plot(fm['threshold'], fm['F-Measure'])
plt.xlabel('Threshold')
plt.ylabel('F-1 Score')
plt.show()

In [None]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
predictions_pandas = predictions.toPandas()
print('Test Area Under PR: ', evaluator.evaluate(predictions))

In [None]:
# Calculate and print f1, recall and precision scores
f1 = f1_score(predictions_pandas.label, predictions_pandas.prediction)
recall = recall_score(predictions_pandas.label, predictions_pandas.prediction)
precision = precision_score(predictions_pandas.label, predictions_pandas.prediction)

print('F1-Score: {}, Recall: {}, Precision: {}'.format(f1, recall, precision))

##### 2. Gradient-Boosting Classifier

In [None]:
# Create gradient-boosted tree classifier object
gbt = GBTClassifier(featuresCol = 'features', labelCol = 'label')

In [None]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
predictions_pandas = predictions.toPandas()
print('Test Area Under PR: ', evaluator.evaluate(predictions))

In [None]:
# Calculate and print f1, recall and precision scores
f1 = f1_score(predictions_pandas.label, predictions_pandas.prediction)
recall = recall_score(predictions_pandas.label, predictions_pandas.prediction)
precision = precision_score(predictions_pandas.label, predictions_pandas.prediction)

print('F1-Score: {}, Recall: {}, Precision: {}'.format(f1, recall, precision))

##### 3. Decision Tree Classifier

In [None]:
# Create decision tree classifier
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label')

In [None]:
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 4, 6])
             .addGrid(dt.maxBins, [20, 60])
             .addGrid(dt.impurity, ['gini', 'entropy'])
             .build())
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
predictions_pandas = predictions.toPandas()
print('Test Area Under PR: ', evaluator.evaluate(predictions))

In [None]:
# Calculate and print f1, recall and precision scores
f1 = f1_score(predictions_pandas.label, predictions_pandas.prediction)
recall = recall_score(predictions_pandas.label, predictions_pandas.prediction)
precision = precision_score(predictions_pandas.label, predictions_pandas.prediction)

print('F1-Score: {}, Recall: {}, Precision: {}'.format(f1, recall, precision))

### Conclusion
The goal of this project was to exploit the capabilities of Apache Spark's analytics engine for large-scale data processing to detect customers which are about to stop using Sparkify's music streaming service.

We applied the typical steps of the data science process like gaining understanding about the data, data preparation, modeling and evaluation. The logistic regression model shows the highest performance (F1-Score: 0.66, Recall: 0.84, Precision: 0.54). We are able to recall 84% of the churning customers and can provide them with special offers to keep them from deleting their Sparkify accounts. However, we need to consider a moderate Precision score of 53%. This means that, from all the customers which will receive special offers, 47% of those customers were actually satisfied with the service and would not need any special treatment.