<a href="https://colab.research.google.com/github/amritnaruto/Hands-on-with-Apache-Spark/blob/master/Sparkify_Churn_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup Apache Spark

Code is available in Google Colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('Sparkify-Churn-Prediction') \
        .getOrCreate()

In [None]:
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '37325'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.host', '8d299eff4459'),
 ('spark.app.id', 'local-1596633031240'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Sparkify-Churn-Prediction'),
 ('spark.ui.showConsoleProgress', 'true')]

# Get the dataset

In [None]:
!wget 'https://github.com/angang-li/sparkify/raw/master/mini_sparkify_event_data.json'

--2020-08-05 13:10:34--  https://github.com/angang-li/sparkify/raw/master/mini_sparkify_event_data.json
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://media.githubusercontent.com/media/angang-li/sparkify/master/mini_sparkify_event_data.json [following]
--2020-08-05 13:10:35--  https://media.githubusercontent.com/media/angang-li/sparkify/master/mini_sparkify_event_data.json
Resolving media.githubusercontent.com (media.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to media.githubusercontent.com (media.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 128477301 (123M) [application/octet-stream]
Saving to: ‘mini_sparkify_event_data.json’


2020-08-05 13:10:42 (96.7 MB/s) - ‘mini_sparkify_event_data.json’ saved [128477301/128477301]



In [None]:
df = spark.read.json('mini_sparkify_event_data.json')

In [None]:
df.show()

+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+--------------------+------+-------------+--------------------+------+
|              artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|           page| registration|sessionId|                song|status|           ts|           userAgent|userId|
+--------------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+---------------+-------------+---------+--------------------+------+-------------+--------------------+------+
|      Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|     Bakersfield, CA|   PUT|       NextSong|1538173362000|       29|           Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
|    Five Iron Frenzy|Logged In|    Micah|     M|           79|    Long|236.09424| free|Bost

In [None]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [None]:
(df.count(), len(df.columns))

(286500, 18)

# Analysis

In [None]:
df.select('userId').show()

+------+
|userId|
+------+
|    30|
|     9|
|    30|
|     9|
|    30|
|     9|
|     9|
|    30|
|    30|
|    30|
|     9|
|     9|
|    30|
|     9|
|     9|
|    30|
|     9|
|    74|
|    30|
|     9|
+------+
only showing top 20 rows



In [None]:
df.filter(df['userId']=='').count()

# alternatively we can use df.where(df['userId']=='').count()
# where() is an alias for filter()

8346

In [None]:
df = df.filter(df['userId']!='')

In [None]:
churn_record = df.filter(
    (df['page']=='Cancellation Confirmation') |
    (df['page']=='Submit Downgrade')
    ).dropDuplicates(['userId'])
 
churn_record.show()

+------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------------------+-------------+---------+----+------+-------------+--------------------+------+
|artist|     auth|firstName|gender|itemInSession| lastName|length|level|            location|method|                page| registration|sessionId|song|status|           ts|           userAgent|userId|
+------+---------+---------+------+-------------+---------+------+-----+--------------------+------+--------------------+-------------+---------+----+------+-------------+--------------------+------+
|  null|Cancelled|    Mason|     M|           10|     Hart|  null| free|  Corpus Christi, TX|   GET|Cancellation Conf...|1533157139000|      174|null|   200|1539318918000|"Mozilla/5.0 (Mac...|   125|
|  null|Cancelled|    Ethan|     M|          176|  Johnson|  null| paid|Lexington-Fayette...|   GET|Cancellation Conf...|1538080987000|      934|null|   200|1539761972000|"Mozilla/5.0 (Win...|    51|


In [None]:
# converting a spark dataframe column to list
churn_users = churn_record.select('userId').rdd.flatMap(lambda x: x).collect()
 
type(churn_users)

list

In [None]:
df_churn = df.filter(df['userId'].isin(churn_users))

df_not_churn = df.filter(~df['userId'].isin(churn_users))

In [None]:
# Create Churn Column with 1 if a user is a churned one and 0 if not
from pyspark.sql.functions import lit

df_churn = df_churn.withColumn("churn", lit(1))

df_not_churn = df_not_churn.withColumn("churn", lit(0))

In [None]:
# Filter users who have cancellation log
# cancel_record = df.filter(df['page']=='Cancellation Confirmation')

# canceled_users = cancel_record.select('userId') \
#                     .rdd.flatMap(lambda x: x) \
#                     .collect()

# df_cancel = df_u.filter(df['userId'].isin(canceled_users))

In [None]:
# Filter users who have experience in premium service
# paid_users = df_u.filter(df_u['level']=='paid') \
#                 .select('userId') \
#                 .rdd.flatMap(lambda x: x) \
#                 .collect()

# df_paid = df_u.filter(df_u['userId'].isin(paid_users))

In [None]:
# Union two dataframes into a single dataframe
df_2 = df_churn.union(df_not_churn).select('userId', 'gender', 'level', 'page', 'churn')

df_2.show()

+------+------+-----+---------------+-----+
|userId|gender|level|           page|churn|
+------+------+-----+---------------+-----+
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|    30|     M| paid|       NextSong|    1|
|    30|     M| paid|Add to Playlist|    1|
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|     9|     M| free|    Roll Advert|    1|
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|     9|     M| free|      Thumbs Up|    1|
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       NextSong|    1|
|    74|     F| free|       NextSong|    1|
|    30|     M| paid|       NextSong|    1|
|     9|     M| free|       Next

In [None]:
# Pivot level column and aggregate values for each users
df_level_cnt = df_2.groupby('userId', 'level') \
                 .pivot('level', ['free', 'paid']) \
                 .count() \
                 .drop('level') \
                 .orderBy('userId')

df_level_cnt.show()

+------+----+----+
|userId|free|paid|
+------+----+----+
|    10|null| 795|
|   100| 300|null|
|   100|null|2914|
|100001| 187|null|
|100002|null| 218|
|100003|  78|null|
|100004| 691|null|
|100004|null| 554|
|100005| 216|null|
|100006|  44|null|
|100007|null| 520|
|100008|  73|null|
|100008|null| 867|
|100009|null| 324|
|100009| 347|null|
|100010| 381|null|
|100011|  23|null|
|100012|null| 255|
|100012| 345|null|
|100013| 303|null|
+------+----+----+
only showing top 20 rows



In [None]:
df_2.select('page').dropDuplicates().show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+



In [None]:
# Pivot page column and aggregate values for each users
df_page_cnt = df_2.filter((df_2['page']=='NextSong') | (df_2['page']=='Submit Downgrade') | (df_2['page']=='Submit Upgrade') | 
                        (df_2['page']=='Downgrade') | (df_2['page']=='Thumbs Up') | (df_2['page']=='Thumbs Down') | 
                        (df_2['page']=='Add to Playlist') | (df_2['page']=='Add Friend')) \
                .groupby('userId', 'page') \
                .pivot('page') \
                .count() \
                .drop('page') \
                .orderBy('userId')

df_page_cnt.show()

+------+----------+---------------+---------+--------+----------------+--------------+-----------+---------+
|userId|Add Friend|Add to Playlist|Downgrade|NextSong|Submit Downgrade|Submit Upgrade|Thumbs Down|Thumbs Up|
+------+----------+---------------+---------+--------+----------------+--------------+-----------+---------+
|    10|      null|           null|        7|    null|            null|          null|       null|     null|
|    10|      null|              9|     null|    null|            null|          null|       null|     null|
|    10|      null|           null|     null|     673|            null|          null|       null|     null|
|    10|      null|           null|     null|    null|            null|          null|       null|       37|
|    10|      null|           null|     null|    null|            null|          null|          4|     null|
|    10|        12|           null|     null|    null|            null|          null|       null|     null|
|   100|      null|

In [None]:
# Drop the original level & page column and join above two dataframes with the original dataframe
df_3 = df_2.drop('level', 'page') \
       .join(df_level_cnt, 'userId', 'inner') \
       .join(df_page_cnt, 'userId', 'inner') \
       .orderBy('userId') \
       .dropDuplicates()

df_3.show()

+------+------+-----+----+----+----------+---------------+---------+--------+----------------+--------------+-----------+---------+
|userId|gender|churn|free|paid|Add Friend|Add to Playlist|Downgrade|NextSong|Submit Downgrade|Submit Upgrade|Thumbs Down|Thumbs Up|
+------+------+-----+----+----+----------+---------------+---------+--------+----------------+--------------+-----------+---------+
|100010|     F|    0| 381|null|         4|           null|     null|    null|            null|          null|       null|     null|
|100010|     F|    0| 381|null|      null|           null|     null|    null|            null|          null|          5|     null|
|100010|     F|    0| 381|null|      null|           null|     null|     275|            null|          null|       null|     null|
|100010|     F|    0| 381|null|      null|              7|     null|    null|            null|          null|       null|     null|
|100010|     F|    0| 381|null|      null|           null|     null|    null

In [None]:
# Aggregate users data to a single row represents a user information only
df_4 = df_3.groupby('userId', 'gender').max()

df_4.show()

+------+------+----------+---------+---------+---------------+--------------------+--------------+-------------+---------------------+-------------------+----------------+--------------+
|userId|gender|max(churn)|max(free)|max(paid)|max(Add Friend)|max(Add to Playlist)|max(Downgrade)|max(NextSong)|max(Submit Downgrade)|max(Submit Upgrade)|max(Thumbs Down)|max(Thumbs Up)|
+------+------+----------+---------+---------+---------------+--------------------+--------------+-------------+---------------------+-------------------+----------------+--------------+
|100010|     F|         0|      381|     null|              4|                   7|          null|          275|                 null|               null|               5|            17|
|200002|     M|         0|      120|      354|              4|                   8|             5|          387|                 null|                  1|               6|            21|
|   125|     M|         1|       11|     null|           null|   

In [None]:
features = df_4.select(['gender', 'max(free)', 'max(paid)', 'max(Add Friend)',
                      'max(Add to Playlist)', 'max(Downgrade)', 'max(NextSong)',
                      'max(Submit Downgrade)', 'max(Submit Upgrade)', 'max(Thumbs Down)', 
                      'max(Thumbs Up)', 'max(churn)']) \
             .withColumnRenamed('max(free)', 'free') \
             .withColumnRenamed('max(paid)', 'paid') \
             .withColumnRenamed('max(Add Friend)', 'add_friend') \
             .withColumnRenamed('max(Add to Playlist)', 'add_playlist') \
             .withColumnRenamed('max(Downgrade)', 'd_pge_access') \
             .withColumnRenamed('max(NextSong)', 'num_music') \
             .withColumnRenamed('max(Submit Downgrade)', 'num_downgrade') \
             .withColumnRenamed('max(Submit Upgrade)', 'num_upgrade') \
             .withColumnRenamed('max(Thumbs Down)', 'dislike') \
             .withColumnRenamed('max(Thumbs Up)', 'like') \
             .withColumnRenamed('max(churn)', 'churn') \
             .fillna(0)

features.show()

+------+----+----+----------+------------+------------+---------+-------------+-----------+-------+----+-----+
|gender|free|paid|add_friend|add_playlist|d_pge_access|num_music|num_downgrade|num_upgrade|dislike|like|churn|
+------+----+----+----------+------------+------------+---------+-------------+-----------+-------+----+-----+
|     F| 381|   0|         4|           7|           0|      275|            0|          0|      5|  17|    0|
|     M| 120| 354|         4|           8|           5|      387|            0|          1|      6|  21|    0|
|     M|  11|   0|         0|           0|           0|        8|            0|          0|      0|   0|    1|
|     F|   0|4825|        74|         118|          41|     4079|            0|          0|     41| 171|    0|
|     M|   0|2464|        28|          52|          23|     2111|            0|          0|     21| 100|    1|
|     M| 201|   0|         1|           5|           0|      150|            0|          0|      1|   7|    0|
|

In [None]:
features.persist()

DataFrame[gender: string, free: bigint, paid: bigint, add_friend: bigint, add_playlist: bigint, d_pge_access: bigint, num_music: bigint, num_downgrade: bigint, num_upgrade: bigint, dislike: bigint, like: bigint, churn: int]

In [None]:
# Shape of features & the number of churn and current users
print(features.count(), len(features.columns))
print(features.filter(features['churn']==1).count(), features.filter(features['churn']==0).count())

225 12
92 133


# Apply Model

In [None]:
from pyspark.ml.feature import StringIndexer

# String indexer for gender column
indexer = StringIndexer(inputCol="gender", outputCol="gender_index")

In [None]:
from pyspark.ml.feature import VectorAssembler
 
# Vector assembler for creating features
assembler = VectorAssembler(inputCols=['gender_index', 'free', 'paid', 'add_friend', 'add_playlist', 'd_pge_access', 
                                       'num_music', 'num_downgrade', 'num_upgrade', 'dislike', 'like'], 
                            outputCol="features")

In [None]:
from pyspark.ml.feature import MinMaxScaler
 
# MinMax scaler for scaling features
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [None]:
from pyspark.ml import Pipeline
 
# Features transformation
scaled_pipeline = Pipeline(stages=[indexer, assembler, scaler])
scaled_model = scaled_pipeline.fit(features)
scaled_result = scaled_model.transform(features)

In [None]:
from pyspark.ml.feature import PCA

# PCA Analysis
pca = PCA(k=11, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca_model = pca.fit(scaled_result)
pca_model.explainedVariance

DenseVector([0.5248, 0.3193, 0.0822, 0.0283, 0.021, 0.0113, 0.0074, 0.0037, 0.0015, 0.0005, 0.0])

In [None]:
# Split dataset into train, and test split sets
rest, validation = features.randomSplit([0.75, 0.25], seed=42)

In [None]:
# Principal Component Analysis for dimension reduction, k=3
pca = PCA(k=3, inputCol="scaledFeatures", outputCol="pcaFeatures")

In [None]:
from pyspark.ml.classification import LogisticRegression

# Logistic regression model for baseline estimator
lr =  LogisticRegression(featuresCol='pcaFeatures', labelCol='churn', maxIter=10)

In [None]:
# Baseline modeling & Hyper parameter tuning
base_pipeline = Pipeline(stages=[indexer, assembler, scaler, pca, lr])

In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.3, 0.5, 0.7, 0.9]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.2, 0.5, 0.8, 1]) \
    .build()

In [None]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

crossval = CrossValidator(estimator=base_pipeline, 
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='churn'),
                          numFolds=5, 
                          parallelism=4)

In [None]:
base_model = crossval.fit(rest)
 
results = base_model.transform(validation)

In [None]:
type(results)

pyspark.sql.dataframe.DataFrame

In [None]:
# Multiclass Classification Evaluator to evaluate models' F1 score
evaluator = MulticlassClassificationEvaluator(labelCol='churn')

In [None]:
# Results of baseline model's F1 score, and its best hyperparameter
print('F1 score: {}'.format(evaluator.evaluate(results.select(['prediction', 'churn']))))
print('Best parameter for regParam: {}'.format(base_model.bestModel.stages[-1]._java_obj.parent().getRegParam()))
print('Best parameter for elasticNetParam: {}'.format(base_model.bestModel.stages[-1]._java_obj.parent().getElasticNetParam()))

# Above the evaluation code is referenced from:
# https://stackoverflow.com/questions/36697304/how-to-extract-model-hyper-parameters-from-spark-ml-in-pyspark

F1 score: 0.8003680121502423
Best parameter for regParam: 0.0
Best parameter for elasticNetParam: 0.0


In [None]:
# Gradient Boosted Tree for second estimator

from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(featuresCol="scaledFeatures", labelCol="churn", maxIter=10)
gbt_pipeline = Pipeline(stages=[indexer, assembler, scaler, gbt])

gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5, 10, 15]) \
    .build()

crossval_gbt = CrossValidator(estimator=gbt_pipeline,
                              estimatorParamMaps=gbt_paramGrid,
                              evaluator=MulticlassClassificationEvaluator(labelCol='churn'),
                              numFolds=5,
                              parallelism=4)

gbt_model = crossval_gbt.fit(rest)

results_gbt = gbt_model.transform(validation)

# Results of GBT model's F1 score
print('F1 score: {}'.format(evaluator.evaluate(results_gbt.select(['prediction', 'churn']))))
print('Best parameter for maxDepth: {}'.format(gbt_model.bestModel.stages[-1]._java_obj.parent().getMaxDepth()))

F1 score: 0.7781448388503351
Best parameter for maxDepth: 5


In [None]:
# Linear SVC model for the final estimator
 
from pyspark.ml.classification import LinearSVC
 
svm = LinearSVC(featuresCol='scaledFeatures', labelCol='churn', maxIter=10)
svm_pipeline = Pipeline(stages=[indexer, assembler, scaler, pca, svm])
 
svm_paramGrid = ParamGridBuilder() \
    .addGrid(svm.regParam, [0.01, 0.05]) \
    .addGrid(svm.tol, [0.000001, 0.00001, 0.0001]) \
    .build()
 
crossval_svm = CrossValidator(estimator=svm_pipeline,
                             estimatorParamMaps=svm_paramGrid,
                             evaluator=MulticlassClassificationEvaluator(labelCol='churn'),
                             numFolds=5,
                             parallelism=4)
 
svm_model = crossval_svm.fit(rest)
 
results_svm = svm_model.transform(validation)
 
# Results of Naive Bayes model's F1 score, and its best hyperparameter
print('F1 score: {}'.format(evaluator.evaluate(results_svm.select(['prediction', 'churn']))))
print('Best parameter for regParam: {}'.format(svm_model.bestModel.stages[-1]._java_obj.parent().getRegParam()))
print('Best parameter for tol: {}'.format(svm_model.bestModel.stages[-1]._java_obj.parent().getTol()))

F1 score: 0.7781448388503351
Best parameter for regParam: 0.01
Best parameter for tol: 1e-06


In [None]:
# Naive Bayes model for the third estimator
 
# from pyspark.ml.classification import NaiveBayes
 
# nb = NaiveBayes(featuresCol='scaledFeatures', labelCol='churn', modelType='multinomial')
# nb_pipeline = Pipeline(stages=[indexer, assembler, scaler, nb])
 
# nb_paramGrid = ParamGridBuilder() \
#     .addGrid(nb.smoothing, [0, 0.5, 1]) \
#     .build()
 
# crossval_nb = CrossValidator(estimator=nb_pipeline,
#                              estimatorParamMaps=nb_paramGrid,
#                              evaluator=MulticlassClassificationEvaluator(labelCol='churn'),
#                              numFolds=5,
#                              parallelism=5)
 
# nb_model = crossval_nb.fit(rest)
 
# results_nb = nb_model.transform(validation)
 
# # Results of Naive Bayes model's F1 score, and its best hyperparameter
# print('F1 score: {}'.format(evaluator.evaluate(results_nb.select(['prediction', 'churn']))))
# print('Best parameter for Smoothing: {}'.format(nb_model.bestModel.stages[-1]._java_obj.parent().getSmoothing()))