# Sparkify Churn Prediction - Spark - AWS EMR Deployment

This project aims to leverage approximate **27,000,000** user activity records to predict user churn, identifying those most likely to discontinue their engagement. The process involves the retrieval of the complete **12GB dataset** from **AWS S3** and its subsequent processing on an **AWS EMR cluster**. We employ **Spark** for data wrangling, feature engineering, model training, and hyperparameter tuning.

### Connect to AWS EMR cluster

In [1]:
# Start Spark Session

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1697312914837_0008,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Spark SQL
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import (avg, col, concat, desc, lit, min, max, split,
                                   udf, countDistinct, sum, count, isnan, when,
                                   sort_array, asc)
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DoubleType

# Spark ML
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

# Python
import numpy as np

import warnings

warnings.filterwarnings("ignore")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Load 12GB full data from AWS S3

In [3]:
# Download java first# Create a SparkSession with the application name "sparkify".
# SparkSession is the entry point for interacting with the Spark cluster.
# It requires Java Runtime Environment (JRE) to run, as Spark is built using Scala and Java.

## Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

## Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [4]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [5]:
print('Total activity records:', df.count())
print('Total columns:', len(df.columns))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total activity records: 26259199
Total columns: 18

In [6]:
# Summarize `auth` (Login status of the user)
df.groupby('auth').agg(F.countDistinct('userId')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------+
|      auth|count(userId)|
+----------+-------------+
|     Guest|            1|
|Logged Out|            1|
| Cancelled|         5003|
| Logged In|        22277|
+----------+-------------+

### Clean data

In [7]:
# Filter out Guest and Logged Out to exclude guests
df_clean = df.filter((col('auth') == 'Logged In') |
                     (col('auth') == 'Cancelled'))
# Exclude null user_id
df_clean = df_clean.filter(df['userId'] != '')
df_clean.groupby('auth').agg(F.countDistinct('userId')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------------+
|     auth|count(userId)|
+---------+-------------+
|Cancelled|         5003|
|Logged In|        22277|
+---------+-------------+

In [8]:
# Check for nulls
df_clean.filter(df_clean.userId == '').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [9]:
df_clean.filter(df_clean.sessionId == '').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

## Exploratory Data Analysis
Define churn based on the `auth` (Login status of the user). During the data cleaning process, we noticed that the "Cancelled" status is likely related to customer churn. In the small dataset, the overall churn probability is 23%.

In [10]:
# Generate churn 1/0 as binary indicator `cancel` 1/0
cancel = udf(lambda auth: 1 if auth == 'Cancelled' else 0, IntegerType())
df_clean = df_clean.withColumn('cancel', cancel('auth'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
pages = df_clean.groupby('page','method').count().sort('method')
pages.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------+--------+
|                page|method|   count|
+--------------------+------+--------+
|Cancellation Conf...|   GET|    5003|
|            Settings|   GET|  147074|
|               Error|   GET|   25048|
|                Help|   GET|  129448|
|                Home|   GET|  933124|
|               About|   GET|   48377|
|         Roll Advert|   GET|  385212|
|             Upgrade|   GET|   50507|
|           Downgrade|   GET|  184240|
|              Logout|   PUT|  296005|
|           Thumbs Up|   PUT| 1151465|
|    Submit Downgrade|   PUT|    6494|
|          Add Friend|   PUT|  381664|
|      Submit Upgrade|   PUT|   15135|
|              Cancel|   PUT|    5003|
|       Save Settings|   PUT|   29516|
|            NextSong|   PUT|20850272|
|         Thumbs Down|   PUT|  239212|
|     Add to Playlist|   PUT|  597921|
+--------------------+------+--------+

## Feature Engineering

In [12]:
df_feat = df_clean

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Calculate user age since registration to the activity date
df_feat = df_feat.withColumn("date", F.to_date(F.from_unixtime(col('ts')/lit(1000))))
df_feat = df_feat.withColumn('date_register', F.to_date(F.from_unixtime(col('registration')/lit(1000))))
df_feat = df_feat.withColumn('user_age', F.datediff(col('date'), col('date_register')))
df_feat.describe('user_age').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|         user_age|
+-------+-----------------+
|  count|         25480720|
|   mean| 65.7917132639894|
| stddev|41.19433609714087|
|    min|              -23|
|    max|              411|
+-------+-----------------+

In [14]:
# Get proactive user interactions except for playing songs
inter = ['Thumbs Up', 'Thumbs Down', 'Add to Playlist', 'Add Friend']
get_interactions = udf(lambda x: 1 if x in inter else 0, IntegerType())

# Get negative feedback from user
thumbs_down = udf(lambda x: 1 if x == 'Thumbs Down' else 0, IntegerType())

df_feat = df_feat.withColumn('interactions', get_interactions('page'))
df_feat = df_feat.withColumn('thumbs_down', thumbs_down('page'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Create `downgrades` to show if the user ever downgraded
downgraded = udf(lambda x: 1 if x == 'Submit Downgrade' else 0, IntegerType())
df_feat = df_feat.withColumn('downgraded', downgraded('page'))

# Create `paid` to show if the user ever be 'paid' user
paid = udf(lambda x: 1 if x == 'paid' else 0, IntegerType())
df_feat = df_feat.withColumn('paid', paid('level'))

# Create `songs` feature to calculate number of songs from users
NextSong = udf(lambda x: 1 if x == 'NextSong' else 0, IntegerType())
df_feat = df_feat.withColumn('songs', NextSong('page'))

# Convert gender to 1/0. 1 = Female
female = udf(lambda x: 1 if x == 'F' else 0, IntegerType())
df_feat = df_feat.withColumn('female', female('gender'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
user_dims = df_feat.groupby('userId','female','date_register').agg(\
                                                          max('user_age').alias('user_age'),\
                                                          max('paid').alias('paid_user'),\
                                                          max('downgraded').alias('downgraded_user'),\
                                                          max('cancel').alias('canceled'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# Inspect Dimensions
user_dims.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1736302', female=1, date_register=datetime.date(2018, 6, 18), user_age=162, paid_user=1, downgraded_user=0, canceled=0)]

In [18]:
# Get the aggregates measures by userId & sessionId level
measures = df_feat.groupby('userId','sessionId').agg(\
                                                      min('date').alias('start_date'),\
                                                      max('date').alias('end_date'),\
                                                      sum('interactions').alias('interactions'),\
                                                      sum('thumbs_down').alias('thumbs_down'),\
                                                      sum('songs').alias('songs'),\
                                                      sum('length').alias('length'),\
                                                      countDistinct('artist').alias('c_artist'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Get the date gap between sessions
# Get the aggregates measures by userId level
measures.createOrReplaceTempView("measures_temp")
measures_agg = spark.sql(
"""
    SELECT
        userId AS userId,
        COUNT(sessionId) AS total_session,
        AVG(interactions) AS interactions,
        AVG(thumbs_down) AS thumbs_down,
        AVG(songs) AS songs,
        AVG(length) AS length,
        AVG(c_artist) AS artists,
        AVG(session_gap) AS session_gap
    FROM (
        SELECT
            *,
            LAG(end_date) OVER (PARTITION BY userId ORDER BY start_date) AS prev_end_date,
            DATEDIFF(start_date, LAG(end_date) OVER (PARTITION BY userId ORDER BY start_date)) AS session_gap
        FROM
            measures_temp
    ) AS temp
    GROUP BY userId
""")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Inspect measures
measures_agg.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1001393', total_session=7, interactions=11.142857142857142, thumbs_down=0.8571428571428571, songs=70.57142857142857, length=17726.613365714285, artists=68.14285714285714, session_gap=9.5)]

In [21]:
# Inspect user_dim and measure shape
print('user_dims: ', user_dims.count(),' measure_agg: ', measures_agg.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

user_dims:  22277  measure_agg:  22277

In [22]:
# Join dims and measures_agg
df_model = user_dims.join(measures_agg, on = 'userId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Modeling

In [23]:
df_model.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- female: integer (nullable = true)
 |-- date_register: date (nullable = true)
 |-- user_age: integer (nullable = true)
 |-- paid_user: integer (nullable = true)
 |-- downgraded_user: integer (nullable = true)
 |-- canceled: integer (nullable = true)
 |-- total_session: long (nullable = false)
 |-- interactions: double (nullable = true)
 |-- thumbs_down: double (nullable = true)
 |-- songs: double (nullable = true)
 |-- length: double (nullable = true)
 |-- artists: double (nullable = true)
 |-- session_gap: double (nullable = true)

In [24]:
# Assemble num Features 
assembler = VectorAssembler(inputCols=['female','user_age','paid_user',\
                                       'downgraded_user','total_session', 'interactions',\
                                      'thumbs_down','length','session_gap'],\
                            outputCol='NumFeatures',handleInvalid = 'skip')
data = assembler.transform(df_model)

# Use 'canceled' as model prediction label
data = data.withColumnRenamed('canceled','label')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# Get weighted col (solve unbalanced problem)
balancingRatio = data.filter(col('label') == 1).count() / data.count()
calculateWeights = udf(lambda x: 1 * balancingRatio if x == 0 else (1 * (1.0 - balancingRatio)), DoubleType())

data = data.withColumn("classWeightCol", calculateWeights('label'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# Split train/test data
test, validation = data.randomSplit([0.8, 0.2], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# Standard Scaler
standardscaler = StandardScaler(inputCol="NumFeatures", outputCol="features", withMean=True, withStd=True)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
def fit_model(model, paramGrid = None):
    """
    Input: model, paramgrid (optional)
    Output: fitted model, prediction on validation set
    """
    pipeline = Pipeline(stages=[standardscaler, model])
    
    if paramGrid != None:
        crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)
        fitmodel = crossval.fit(test)
    else:
        fitmodel = pipeline.fit(test)
    
    results = fitmodel.transform(validation)
    
    return fitmodel, results

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Evaluate the model on validation set
def val_evaluation(results):
    """
    Input: prediction results
    Output: accuracy, precision and recall score
    """
    predictionAndLabels = results.select(['prediction', 'label']\
                                      ).withColumn('label', col('label').cast(DoubleType())).rdd

    metrics = MulticlassMetrics(predictionAndLabels) # create confusion matrix to calculate evaluation metrics
    cm=metrics.confusionMatrix().toArray() # convert confusion matrix to numpy array
    
    # accuracy: (TP+TN)/Total Predictions
    # precision: TP/(TP + FP)
    # recall: TP/(TP + FN)
    # f1 score: 2*(Recall * Precision) / (Recall + Precision)
    accuracy=(cm[0][0]+cm[1][1])/cm.sum()
    precision=(cm[1][1])/(cm[0][1]+cm[1][1])
    recall=(cm[1][1])/(cm[1][0]+cm[1][1])
    f1 = MulticlassClassificationEvaluator().evaluate(results)

    return(round(f1,2), round(accuracy,2),round(precision,2),round(recall,2))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Random Forest Tuning

In [31]:
rf = RandomForestClassifier()
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth,[10,20])\
    .addGrid(rf.numTrees,[20,30])\
    .build()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
rfmodel, rfresults = fit_model(rf, paramGrid = rf_paramGrid)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
brf_index = rfmodel.avgMetrics.index(np.max(rfmodel.avgMetrics))
brf = rf_paramGrid[brf_index]
print(brf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{Param(parent='RandomForestClassifier_8d9332fad231', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 20, Param(parent='RandomForestClassifier_8d9332fad231', name='numTrees', doc='Number of trees to train (>= 1).'): 30}

In [34]:
print("RandomForest: f1, accuracy,precision,recall", val_evaluation(rfresults))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RandomForest: f1, accuracy,precision,recall (0.88, 0.97, 0.97, 0.87)

In [35]:
# Extract feature importance from rfmodel
featImportances = np.array(rfmodel.bestModel.stages[-1].featureImportances)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# Extract feature names from the original data
dict_feats = data.schema['NumFeatures'].metadata['ml_attr']['attrs']['numeric']
list_feats = np.array([x['name'] for x in dict_feats])
print(list_feats,featImportances)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['female' 'user_age' 'paid_user' 'downgraded_user' 'total_session'
 'interactions' 'thumbs_down' 'length' 'session_gap'] [0.02346042 0.26185797 0.01178645 0.01482874 0.1897405  0.08458735
 0.08593863 0.08390108 0.24389885]

### Logistic Regression Tuning

In [41]:
# With weight col
lr_w = LogisticRegression(maxIter=10, weightCol = 'classWeightCol')
lrmodel_w, lrresults_w = fit_model(lr_w)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
print("LogisticRegression with class weight: f1,accuracy,precision,recall", val_evaluation(lrresults_w))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

LogisticRegression with class weight: f1,accuracy,precision,recall (0.75, 0.74, 0.44, 0.78)

In [46]:
# Get coefficients
lr_corr = lrmodel_w.stages[-1].coefficients
print(list_feats, lr_corr)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['female' 'user_age' 'paid_user' 'downgraded_user' 'total_session'
 'interactions' 'thumbs_down' 'length' 'session_gap'] [-0.018579235553713488,-0.4952864228478297,0.19108230985204852,-0.055479250750583006,-0.5741988216779174,-0.9857262013678154,0.5285386711079961,0.55382200631948,-1.887944882574658]

#### Note: 

Detailed visualizations, exploratory data analysis, and model selection processes are documented in the `sparkify_mini_data_exploration.ipynb` file. This file is focused on processing the full 12GB dataset on AWS EMR and simplifies the data exploration process.