<a href="https://colab.research.google.com/github/OluwajobaOluwabori/Customer-Churn-Prediction/blob/main/Capstone_Project_full.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install the dependencies





In [1]:
'''
Installing Apache Spark 3.3.24 with hadoop 3.0, Java 8 and Findspark in order to locate the spark in the system.
'''

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz -O spark-3.3.4-bin-hadoop3.tgz
!tar -xvzf /content/spark-3.3.4-bin-hadoop3.tgz
!pip install -q findspark


spark-3.3.4-bin-hadoop3/
spark-3.3.4-bin-hadoop3/R/
spark-3.3.4-bin-hadoop3/R/lib/
spark-3.3.4-bin-hadoop3/R/lib/sparkr.zip
spark-3.3.4-bin-hadoop3/R/lib/SparkR/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/worker/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/worker/worker.R
spark-3.3.4-bin-hadoop3/R/lib/SparkR/worker/daemon.R
spark-3.3.4-bin-hadoop3/R/lib/SparkR/tests/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/tests/testthat/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/tests/testthat/test_basic.R
spark-3.3.4-bin-hadoop3/R/lib/SparkR/profile/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/profile/shell.R
spark-3.3.4-bin-hadoop3/R/lib/SparkR/profile/general.R
spark-3.3.4-bin-hadoop3/R/lib/SparkR/doc/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/doc/sparkr-vignettes.html
spark-3.3.4-bin-hadoop3/R/lib/SparkR/doc/sparkr-vignettes.Rmd
spark-3.3.4-bin-hadoop3/R/lib/SparkR/doc/sparkr-vignettes.R
spark-3.3.4-bin-hadoop3/R/lib/SparkR/doc/index.html
spark-3.3.4-bin-hadoop3/R/lib/SparkR/R/
spark-3.3.4-bin-hadoop3/R/lib/SparkR/R/SparkR
spark-

In [2]:
'''
Setting the environment path that enables PySpark to run in this Colab environment.
'''

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.4-bin-hadoop3"


In [3]:
'''
Running a local spark session to test the installation.
'''

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
'''
Import libraries
'''

import pyspark.pandas as ps
from pyspark.sql import Window
from pyspark.sql.functions import udf,col,window, max as Fmax
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType,TimestampType
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,DecisionTreeClassifier
import numpy as np
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
import time
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
from builtins import round






In [5]:
# Creating a Spark session

spark = SparkSession \
    .builder \
    .appName("Churn Prediction") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()


TEXT PREPROCESSING

# **1. Load Data into Spark**

In [6]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
#spark.sparkContext.addFile("https://udacity-dsnd.s3.amazonaws.com/sparkify/mini_sparkify_event_data.json")
spark.sparkContext.addFile("https://udacity-dsnd.s3.amazonaws.com/sparkify/sparkify_event_data.json")

#s3n://udacity-dsnd/sparkify/sparkify_event_data.json
#https://udacity-dsnd.s3.amazonaws.com/sparkify/sparkify_event_data.json
data = spark.read.json("file://" +SparkFiles.get("sparkify_event_data.json"))

In [None]:
data.show(30)

+--------------------+----------+---------+------+-------------+---------+----------+-----+--------------------+------+---------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|              artist|      auth|firstName|gender|itemInSession| lastName|    length|level|            location|method|     page| registration|sessionId|                song|status|           ts|           userAgent| userId|
+--------------------+----------+---------+------+-------------+---------+----------+-----+--------------------+------+---------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|           Popol Vuh| Logged In|    Shlok|     M|          278|  Johnson| 524.32934| paid|Dallas-Fort Worth...|   PUT| NextSong|1533734541000|    22683|Ich mache einen S...|   200|1538352001000|"Mozilla/5.0 (Win...|1749042|
|         Los Bunkers| Logged In|  Vianney|     F|            9|   Miller| 238.39302| paid|San Franc

# **Explore and Clean Data**

In [None]:
#data1.where(data1['_corrupt_record'].isNotNull()).show()

In [None]:
# Structure of the dataframe
data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
#Column types
data.dtypes

[('artist', 'string'),
 ('auth', 'string'),
 ('firstName', 'string'),
 ('gender', 'string'),
 ('itemInSession', 'bigint'),
 ('lastName', 'string'),
 ('length', 'double'),
 ('level', 'string'),
 ('location', 'string'),
 ('method', 'string'),
 ('page', 'string'),
 ('registration', 'bigint'),
 ('sessionId', 'bigint'),
 ('song', 'string'),
 ('status', 'bigint'),
 ('ts', 'bigint'),
 ('userAgent', 'string'),
 ('userId', 'string')]

In [None]:
data.count()

26259199

In [None]:
#Number of users
data.select('userId').distinct().count()

22278

In [7]:
'''
 Rows with empty string in the 'userId' column correspond to logs in which user has not been logged in
 that is 'Logged Out' or 'Guest' authentication levels.
 '''

data.where(data['userId']=='').groupby('auth').count().show()

+----+-----+
|auth|count|
+----+-----+
+----+-----+



In [8]:
data.groupby('auth').count().show()
#data1.groupby('auth').count()



+----------+--------+
|      auth|   count|
+----------+--------+
|Logged Out|  774285|
| Cancelled|    5003|
|     Guest|    4194|
| Logged In|25475717|
+----------+--------+



In [9]:
data.groupby('page').count().show()

+--------------------+--------+
|                page|   count|
+--------------------+--------+
|              Cancel|    5003|
|    Submit Downgrade|    6494|
|         Thumbs Down|  239212|
|                Home| 1343102|
|           Downgrade|  184240|
|         Roll Advert|  385212|
|              Logout|  296005|
|       Save Settings|   29516|
|Cancellation Conf...|    5003|
|               About|   92759|
| Submit Registration|     401|
|            Settings|  147074|
|               Login|  296350|
|            Register|     802|
|     Add to Playlist|  597921|
|          Add Friend|  381664|
|            NextSong|20850272|
|           Thumbs Up| 1151465|
|                Help|  155100|
|             Upgrade|   50507|
+--------------------+--------+
only showing top 20 rows



In [12]:
data=data.pandas_api()


In [13]:
#How many users dropped off and what was the level of their account(free or paid)?

data[data.page=='Cancellation Confirmation'].groupby(['level','page']).count()['userId']

level  page                     
free   Cancellation Confirmation    1579
paid   Cancellation Confirmation    3424
Name: userId, dtype: int64

In [14]:
## Number of users that tried downgrading from paid to free and number of users that actually downgraded from paid to free

#Downgraded_users.userId.nunique()
#data[data.page=='Downgrade']['userId'].nunique()
print('{} users tried downgrading from paid to free and {} actually downgraded from paid to free.'.format(data[data.page=='Downgrade']['userId'].nunique(),data[data.page=='Submit Downgrade']['userId'].nunique()))

15209 users tried downgrading from paid to free and 5103 actually downgraded from paid to free.


In [15]:
data=data.to_spark()




# **Create Features**

In [16]:
data = data.withColumn('date', timestamp_seconds(data.ts/1000))
#data1 = data1.withColumn('tsDate', (data1['ts'] / 1000).cast(TimestampType()))


## Define Churn

`Cancellation Confirmation` and `Submit Downgrade` events make up our definition for churn. `Cancellation Confirmation` happens for both paid and free users who cancel the services. `Submit Downgrade` happens for premium users who downgrades to free tier.

In [17]:
# Add the column 'churn' to identify users that churned. 1 for yes, 0 for no
df = data.withColumn('churn', when(data.page == 'Cancellation Confirmation',1).when(data.page == 'Submit Downgrade', 1).otherwise(0))
df = df.withColumn('user_churn', Fmax('churn').over(Window.partitionBy('UserId')))

####Time between registration and drop-off(churn)

In [18]:
#First 2 lines creates a dataframe with a new column for the difference in when user registered and the timestamp of that activity

life_time_df = df.groupby(['user_churn', 'userId', 'ts', 'registration']).count() \
                    .withColumn('life_time', (df.ts - df.registration) / 1000) \
                    .groupBy('userId', 'user_churn').agg({'life_time' : 'max'}).withColumnRenamed('max(life_time)','life_span')
#df.where(df.userId==105).tail(10)
life_time_df.show()
#life_time_df.where(life_time_df.user_churn==1).show()


+-------+----------+-----------+
| userId|user_churn|  life_span|
+-------+----------+-----------+
|1000280|         1|  6679046.0|
|1000353|         1| 1.498609E7|
|1000503|         1|  9039679.0|
|1001129|         1|  7771324.0|
|1001163|         1|  5253389.0|
|1001246|         1|  7317326.0|
|1001393|         0|  5173895.0|
|1001450|         0|  6584243.0|
|1001559|         0|  5507913.0|
|1001607|         1|  5817002.0|
|1001759|         0|  1884881.0|
|1001904|         0|  6092129.0|
|1001963|         0|  3721823.0|
|1002143|         0|  6874233.0|
|1002185|         0|  5680891.0|
|1002295|         0|1.2505832E7|
|1002397|         1|1.0227863E7|
|1002409|         0|  4940810.0|
|1002491|         0|  7667619.0|
|1002493|         1|   918698.0|
+-------+----------+-----------+
only showing top 20 rows



###Gender

In [19]:
features = ps.DataFrame(dtype='int64')

gender = data.groupby('userId').agg(first(when(data['gender']=='M',0).otherwise(1))).withColumnRenamed('first(CASE WHEN (gender = M) THEN 0 ELSE 1 END)', 'gender')
features = gender
features.show()

  fields = [
  for column, series in pdf.iteritems():


+-------+------+
| userId|gender|
+-------+------+
|1390009|     0|
|1519090|     1|
|1394508|     0|
|1178731|     1|
|1351489|     0|
|1358765|     0|
|1500901|     0|
|1718034|     0|
|1384823|     1|
|1083324|     0|
|1633577|     1|
|1875484|     0|
|1492713|     0|
|1658815|     1|
|1114507|     1|
|1331962|     0|
|1588738|     1|
|1338783|     1|
|1965481|     0|
|1057724|     0|
+-------+------+
only showing top 20 rows



###Number of songs each user listened to


In [21]:
no_of_songsdf = data.where(data['page']== 'NextSong').groupby('userId').count()\
                .withColumnRenamed('count', 'songs')

features = features.join(no_of_songsdf, on = 'userId', how='left').withColumn('songs',when(col('songs').isNull(), 0).otherwise(col('songs')))


In [22]:
features = features.join(life_time_df, on = 'userId')


###Number of thumbs up and thumbs down given


In [23]:
#Thumbs_Down = data1.withColumn('Thumbs Down',when(data1.page == 'Thumbs Down', 1).otherwise(0))
Thumbs_Down = data.groupBy('userId').agg(sum(when(data.page == 'Thumbs Down', 1).otherwise(0))).withColumnRenamed('sum(CASE WHEN (page = Thumbs Down) THEN 1 ELSE 0 END)','Thumbs Down')


#data1.filter(data1.page=='Thumbs Down').show()
Thumbs_Up = data.groupBy('userId').agg(sum(when(data.page == 'Thumbs Up', 1).otherwise(0))).withColumnRenamed('sum(CASE WHEN (page = Thumbs Up) THEN 1 ELSE 0 END)','Thumbs Up')


features = features.join(Thumbs_Down, on = 'userId')
features = features.join(Thumbs_Up, on = 'userId')

features.show()

+-------+------+-----+----------+-----------+-----------+---------+
| userId|gender|songs|user_churn|  life_span|Thumbs Down|Thumbs Up|
+-------+------+-----+----------+-----------+-----------+---------+
|1000280|     0| 1022|         1|  6679046.0|         33|       53|
|1000353|     1|  239|         1| 1.498609E7|          4|       13|
|1000503|     1|  191|         1|  9039679.0|          3|        3|
|1001129|     0|  557|         1|  7771324.0|         10|       25|
|1001163|     1|  258|         1|  5253389.0|          2|        8|
|1001246|     1|  687|         1|  7317326.0|          8|       59|
|1001393|     0|  494|         0|  5173895.0|          6|       43|
|1001450|     0|   31|         0|  6584243.0|          2|        1|
|1001559|     1| 1016|         0|  5507913.0|          6|       51|
|1001607|     1| 1929|         1|  5817002.0|         16|      107|
|1001759|     0|   57|         0|  1884881.0|          1|        4|
|1001904|     1| 1380|         0|  6092129.0|   

####Level of User (free/paid)


In [25]:
users_level = data.groupBy('userId').agg(first(when(data['level']=='free',0).otherwise(1)))\
.withColumnRenamed('first(CASE WHEN (level = free) THEN 0 ELSE 1 END)', 'level')\
.na.drop() #or .dropna()
features = features.join(users_level, on = 'userId')


###Location of user

In [26]:
#Split the location column into two new columns - City and State
data = data.withColumn('City',pyspark.sql.functions.split(data['location'], ',').getItem(0)).withColumn('State',pyspark.sql.functions.split(data['location'], ',').getItem(1))
indexer = StringIndexer(inputCol="State", outputCol="StateIndex", handleInvalid="skip")
data = indexer.fit(data).transform(data)
users_location = data.groupBy('userId').agg(last('StateIndex').alias('StateIndex')).na.drop() #or .dropna()
features = features.join(users_location, on = 'userId')


In [27]:
data=data.drop('State')
#data.isnull('StateIndex')


In [28]:
#Users that churned, whats the proportion of free to paid users
#data1.filter(data1['page']=='Help').groupBy('level').count().show()
Visted_help_pg = data.filter(data.page=='Help')
#Help_count = Visted_help_pg.groupby('userId').agg(countDistinct('page').alias('Helps_count')).when(data1.Helps_count == null, 0).otherwise(data1.Helps_count)
Help_count = Visted_help_pg.groupby('userId').agg(countDistinct('page').alias('Help_count'))
features = features.join(Help_count, on = 'userId').withColumn('Help_count',(when(col(('Help_count')).isNull(), 0).otherwise(col(('Help_count')))))

####Users' number of downgrade and upgrades


In [29]:
Downgrade_No = data.groupBy('userId').agg(sum(when(data.page=='Submit Downgrade',1).otherwise(0)).alias('Downgrades'))
Upgrade_No = data.groupBy('userId').agg(sum(when(data.page=='Submit Upgrade',1).otherwise(0)).alias('Upgrades'))
features = features.join(Downgrade_No, on = 'userId')
features = features.join(Upgrade_No, on = 'userId')


Number of songs in users'playlist

In [36]:
Playlist_No = data.groupBy('userId').agg(sum(when(data.page=='Add to Playlist',1).otherwise(0)).alias('Playlist_no'))
features = features.join(Playlist_No, on = 'userId')


Number of friends

In [32]:
Friend_No = data.groupBy('userId').agg(sum(when(data.page=='Add Friend',1).otherwise(0)).alias('Friend_no'))
features = features.join(Friend_No, on = 'userId')

 In a distributed environment it can be a little more complicated to take raw DataFrames as an input for training so we would be using Assemblers to prepare our training data. VectorAssember from Spark ML library is a module that allows to convert numerical features into a single vector that is used by the machine learning models.

In [None]:
assembler = VectorAssembler(
    inputCols=['songs','Thumbs Down','Thumbs Up','Help_count','Downgrades','Upgrades','life_span','Playlist_no','Friend_no'],
    outputCol="features",handleInvalid = "keep" )
output_vector = assembler.transform(features)

scaler = StandardScaler(inputCol='features', outputCol ='num_vector_sc', withStd=True, withMean=True)
output_scaler = scaler.fit(output_vector).transform(output_vector)

final_assembler = VectorAssembler(inputCols=['num_vector_sc'] + ['gender','level'],outputCol='new_features')
final_vector = final_assembler.transform(output_scaler)


In [None]:
#final data
final_data = final_vector.select(final_vector.user_churn, final_vector.new_features)


## **Build models**

In [None]:
##Splitting final data into train and test dataset

train, test = final_data.randomSplit([0.8, 0.2], 42)

In [None]:
# function for printing metric results
def evaluate_result(model_result, model_name, start, end):

    evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
    evaluator.setLabelCol('user_churn')
    accuracy = evaluator.evaluate(model_result, {evaluator.metricName : 'accuracy'})
    f1 = evaluator.evaluate(model_result, {evaluator.metricName : 'f1'})
    weightedPrecision = evaluator.evaluate(model_result, {evaluator.metricName : 'weightedPrecision'})
    weightedRecall = evaluator.evaluate(model_result, {evaluator.metricName : 'weightedRecall'})
    time = (end - start)/60
    result = [model_name, round(accuracy,3), round(f1,3), round(time,1)]

    print('{} performance metrics:'.format(model_name))
    print('Accuracy: {:.0%}'.format(accuracy))
    print('F-1 Score: {}'.format(f1))
    print('weightedPrecision: {}'.format(weightedPrecision))
    print('weightedRecall: {}'.format(weightedRecall))
    print('Total training time: {} minutes'.format(time))

    return result

In [None]:
# start training
START = time.time()

##Logistics Regression

In [None]:
log_reg = LogisticRegression(maxIter=10, labelCol='user_churn', featuresCol='new_features')
evaluator = MulticlassClassificationEvaluator(labelCol='user_churn')

pipeline = Pipeline(stages=[log_reg])
log_reg_paramGrid = (ParamGridBuilder()
                .addGrid(log_reg.regParam, [0.1, 0.01, 0.001])
                .addGrid(log_reg.elasticNetParam, [0.1, 0.01, 0.5])
                .build())


crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=log_reg_paramGrid,
    evaluator=evaluator,
    numFolds=3)

log_reg_start = time.time()
log_reg_model = crossval.fit(train)
log_reg_end = time.time()


In [None]:
log_reg_results = log_reg_model.transform(test)

log_reg_safe = evaluate_result(log_reg_results, 'Logistic Regression', log_reg_start, log_reg_end)

best_param = list(log_reg_model.getEstimatorParamMaps()[np.argmax(log_reg_model.avgMetrics)].values())
print('Best regression parameter(regParam) is {}, Best elasticNet parameter(elasticNetParam) is {}'.format(best_param[0],best_param[1]))

Logistic Regression performance metrics:
Accuracy: 91%
F-1 Score: 0.909148565086412
weightedPrecision: 0.9234693877551021
weightedRecall: 0.9142857142857144
Total training time: 20.175902156035104 minutes
Best regression parameter(regParam) is 0.001, Best elasticNet parameter(elasticNetParam) is 0.01


In [None]:
lr_results.show()

## Random Forest

In [None]:
ran_forest = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol='user_churn', featuresCol='new_features',seed=42,leafCol="leafId")
evaluator = MulticlassClassificationEvaluator(labelCol='user_churn')

pipeline = Pipeline(stages=[ran_forest])
ran_forest_paramGrid = (ParamGridBuilder()
                .addGrid(ran_forest.numTrees, [2, 3, 4])
                .addGrid(ran_forest.maxDepth, [2, 3, 4,5])
                .build())


crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=ran_forest_paramGrid,
    evaluator=evaluator,
    numFolds=3)

ran_forest_start = time.time()
ran_forest_model = crossval.fit(train)
ran_forest_end = time.time()


In [None]:
ran_forest_results = ran_forest_model.transform(test)
ran_forest_safe = evaluate_result(ran_forest_results, 'Random Forest', ran_forest_start, ran_forest_end)

best_param = list(ran_forest_model.getEstimatorParamMaps()[np.argmax(ran_forest_model.avgMetrics)].values())
print('Best numTrees parameter is {}, Best maxDepth parameter is {}.'.format(best_param[0],best_param[1]))

Random Forest performance metrics:
Accuracy: 91%
F-1 Score: 0.9128704113224237
weightedPrecision: 0.9133089133089133
weightedRecall: 0.9142857142857143
Total training time: 16.961500136057534 minutes
Best numTrees parameter is 3, Best maxDepth parameter is 5.


## Decision Tree

In [None]:
dec_tree = DecisionTreeClassifier(maxDepth=2,maxBins=32, labelCol='user_churn', featuresCol='new_features', leafCol="leafId")
evaluator = MulticlassClassificationEvaluator(labelCol='user_churn')

pipeline = Pipeline(stages=[dec_tree])
dec_tree_paramGrid = (ParamGridBuilder()
                .addGrid(dec_tree.maxDepth, [2, 3, 4,5])
                .addGrid(dec_tree.maxBins, [2, 10, 32])
                .build())


crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=dec_tree_paramGrid,
    evaluator=evaluator,
    numFolds=3)

dec_tree_start = time.time()
dec_tree_model = crossval.fit(train)
dec_tree_end = time.time()

In [None]:
dec_tree_results = dec_tree_model.transform(test)
dec_tree_safe = evaluate_result(dec_tree_results, 'Decision Tree', dec_tree_start, dec_tree_end)

best_param = list(dec_tree.getEstimatorParamMaps()[np.argmax(dec_tree_model.avgMetrics)].values())
print(' Best maxDepth parameter is {}, Best maxBins parameter is {}.'.format(best_param[0],best_param[1]))

In [None]:
columns=['Model','Accuracy score','F-1 Score','Total training time']
spark.createDataFrame([log_reg,ran_forest,dec_tree_safe],columns).show()
