# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. 

In [61]:
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, lag
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.window import Window

import numpy as np
import pandas as pd

%matplotlib inline
import matplotlib.pyplot as plt

import re


In [6]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkifly Project") \
    .getOrCreate()

In [7]:
spark.version

'2.4.3'

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [4]:
#event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
#event_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"

file = 'mini_sparkify_event_data.json'
df = spark.read.json(file)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [5]:
print('total number of rows:' , df.count())

total number of rows: 286500


In [6]:
df.head(2)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9')]

In [7]:
# clean rows with empty userId
df = df.filter("userId <> ''")

In [8]:
# number of distinct users
n_users = df.select("userId").distinct().count()

print("number of distinct users: ", n_users)

number of distinct users:  225


# Exploratory Data Analysis


### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [66]:
# type of subscription
df.select("level").distinct().show()

NameError: name 'df' is not defined

In [14]:
# type of page that user visit
df.select("page").distinct().show(truncate=False)

+-------------------------+
|page                     |
+-------------------------+
|Cancel                   |
|Submit Downgrade         |
|Thumbs Down              |
|Home                     |
|Downgrade                |
|Roll Advert              |
|Logout                   |
|Save Settings            |
|Cancellation Confirmation|
|About                    |
|Settings                 |
|Add to Playlist          |
|Add Friend               |
|NextSong                 |
|Thumbs Up                |
|Help                     |
|Upgrade                  |
|Error                    |
|Submit Upgrade           |
+-------------------------+



In [20]:
df.filter("userId = '100010'").select(["userId", "page", "artist"]).sort(["ts"]).show()

+------+-----------+--------------------+
|userId|       page|              artist|
+------+-----------+--------------------+
|100010|   NextSong|Sleeping With Sirens|
|100010|   NextSong|Francesca Battist...|
|100010|   NextSong|              Brutha|
|100010|  Thumbs Up|                null|
|100010|   NextSong|         Josh Ritter|
|100010|   NextSong|               LMFAO|
|100010|   NextSong|         OneRepublic|
|100010|   NextSong|       Dwight Yoakam|
|100010|Roll Advert|                null|
|100010|   NextSong|      The Chordettes|
|100010|   NextSong|Coko featuring Ki...|
|100010|   NextSong|            The Cure|
|100010|Roll Advert|                null|
|100010|   NextSong|Kid Cudi Vs Crookers|
|100010|  Thumbs Up|                null|
|100010|   NextSong|            Yeasayer|
|100010|   NextSong|             Ben Lee|
|100010|Roll Advert|                null|
|100010|   NextSong|  ? & The Mysterians|
|100010|Roll Advert|                null|
+------+-----------+--------------

In [15]:
# user location
print('number of disctinct locations: ',
      df.select('location').distinct().count())

number of disctinct locations:  114


#### define user  churn

In [16]:
# define user churn
hasCancelled = udf(lambda x: 1 if x=='Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn('Churn', hasCancelled(df['page']))

user_window = Window.partitionBy('userId')
df = df.withColumn('Churned', F.max('Churn').over(user_window))

In [17]:
df.select(['userId']).filter("Churn=1").distinct().show(10)

+------+
|userId|
+------+
|   125|
|    51|
|    54|
|100014|
|   101|
|    29|
|100021|
|    87|
|    73|
|     3|
+------+
only showing top 10 rows



#### calculate churn rate

In [18]:
n_user_churn = df.select(['userId']).filter("Churn=1").distinct().count()

print("total users: ", n_users)
print("users churned: ",n_user_churn)
print("users not churned: ", (n_users - n_user_churn) )
print("churn rate: ", n_user_churn/n_users )


total users:  225
users churned:  52
users not churned:  173
churn rate:  0.2311111111111111


In [21]:
# create new column to count the number of songs played by the user
is_song = udf(lambda x: 1 if x=='NextSong' else 0, IntegerType())
df = df.withColumn('NextSong', is_song(df['page']))

In [22]:
# create new column to check if user has been in paid subscription
has_paid= udf(lambda x: 1 if x=='paid' else 0, IntegerType()) 
df = df.withColumn('hasPaid', has_paid('level'))

In [23]:
# create new column to check if user has downgrade
has_downgrade =  udf(lambda x: 1 if x=='Downgrade' else 0, IntegerType()) 
df = df.withColumn('hasDowngrade', has_downgrade('page'))

#### info about the artists

In [24]:
artist_no_churned = np.array(df.filter('artist is not null and Churned=0').select('artist').collect())
artist_no_churned = set(artist_no_churned.flatten())

artist_churned = np.array(df.filter('artist is not null and Churned=1').select('artist').collect())
artist_churned = set(artist_churned.flatten())

artist_only_churn = artist_churned - artist_no_churned
artist_only_no_churn = artist_no_churned - artist_churned

print('distinct artist: ',
      '17655')
      #df.filter('artist is not null').select('artist').distinct().count())

print('distinct artist in churned users: ',
      len(artist_churned))

print('distinct artist not churned users: ',
      len(artist_no_churned))

print('artist only in churned: ', 
      len(artist_churned - artist_no_churned))

print('artist only in not churned: ',
     len(artist_no_churned - artist_churned))

#df = df.withColumn('artist_only_churn', col('artist').isin(artist_only_churn).cast('integer'))
#df = df.withColumn('artist_only_no_churn', col('artist').isin(artist_only_no_churn).cast('integer')) 

distinct artist:  17655
distinct artist in churned users:  8402
distinct artist not churned users:  16652
artist only in churned:  1003
artist only in not churned:  9253


users in group that has not churned listen songs from more distinct artists

#### general info aggregated by userId

In [25]:
# general info aggregated by userId
user = df.groupBy("userId").agg(
    F.max('location').alias('max_location'),
    F.max('gender').alias('max_gender'),
    F.max('hasPaid').alias('max_hasPaid'),
    F.max('hasDowngrade').alias('max_hasDowngrade'),
    F.max('Churn').alias('max_churn'),
    F.countDistinct('artist').alias('dist_artist')
#    F.collect_list("artist").alias("artist_list")
)

#user.show(5)

#### time between user sessions

In [26]:
# time between sessions

# rank in session
session_by_ts_win = Window.partitionBy(['userId','sessionId']).orderBy('ts')
df = df.withColumn('rank', F.rank().over(session_by_ts_win))

# time_diff
user_by_ts = Window.partitionBy(['userId']).orderBy('ts')
df = df.withColumn('ts_diff', col('ts') - F.lag('ts',1).over(user_by_ts))

# time diff between first page in current session - last page in previous session
time_btw_session = df.select(['userId','ts_diff'])\
                         .filter("rank=1 and ts_diff is not null")\
                         .groupBy('userId').agg(avg('ts_diff').alias('time_btw_sessions'))
#time_btw_session.show()

#### number of songs played, and number os sessions aggregated BY user, session


In [27]:
# number of songs played, and number os sessions
song_session = df.groupBy(['userId','sessionId'])\
                .agg(
                    F.sum('length'), 
                    F.sum('NextSong')\
                )\
                .groupBy('userId')\
                .agg(
                    F.avg('sum(length)').alias("avg_session_length"),
                    F.avg('sum(NextSong)').alias("avg_session_songs"),
                    F.sum('sum(length)').alias("total_session_length"),   
                    F.sum('sum(NextSong)').alias("total_songs_played"),
                    F.count('sum(length)').alias("total_sessions")
                )

#song_session.show()

#### join dataframes with aggregated columns

This step joins 3 different dataframes that needed to be computed in separated steps:
- data aggregated at the user level
- data aggreated at the user and session level
- time between session

In [28]:
joined_df = None
joined_df = user.join(song_session, ["userId"], "inner")\
                .join(time_btw_session, ["userId"], "inner")

### Persist processed dataframe and save to local file or Amazon S3
To this point, all the preprecessing steps has been concluded. (cleaning, aggregation)

Aggregated data will be stored to the local file system, or remote stored like Amazon S3.

Thosee steps save computation time and resources, as all the followings steps (analysis, modelling) is done on aggregated info.

In [29]:
user2 = joined_df.persist()

In [30]:
user2.write.mode("overwrite").parquet("joined_df2.parquet")

In [33]:
joined_df.count()

210

#### Analyse statistics using pandas

The original small dataset has 286,500 rows. But after aggregating the original dataset to the userId level, we reduced the dataset to 210 rows.

The full dataset has 26,259,199 rows. After aggregating, reduced to 22,278 rows.

After aggregation, the analysis could be done using either spark dataframe or pandas dataframe. The following analysis is done using pandas dataframe as it easier to calculate stats. 

In [33]:
user_pd = user2.toPandas()

#### Numerical features

In [34]:
# calculate mean for users in group not_churned and churned
user_pd['avg_session_length'] = user_pd['avg_session_length']/(60*60)
user_pd['total_session_length'] = user_pd['total_session_length']/(60*60)
user_pd['time_btw_sessions'] = user_pd['time_btw_sessions']/(60*60*24*1000)

user_pd.groupby('max_churn').mean()

Unnamed: 0_level_0,max_hasPaid,max_hasDowngrade,dist_artist,avg_session_length,avg_session_songs,total_session_length,total_songs_played,total_sessions,time_btw_sessions
max_churn,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
0,0.768293,0.719512,786.993902,5.036803,70.911949,80.635538,1164.829268,15.506098,6.026682
1,0.76087,0.73913,581.456522,4.618036,64.868798,54.302568,786.173913,11.152174,2.894424


**Features with LOW difference on mean between not_churned and churned groups:**
- max_hasPaid, max_hasDowngrade 

The stats show that users that at some point has paid or has downgrade do not make much diference on churn.

Those features will NOT be used for modelling.

**Features with HIGH difference on mean between not_churned and churned groups:**
- dist_artist, 
- avg_session_length, 
- avg_session_songs, 
- total_session_length, 
- total_songs_played, 
- total_sessions, 
- time_btw_sessions

Those features will be used for modelling

Churned users tend to stay less time in each session and listen less songs according to (avg_session_length,	avg_session_songs)

Oddly, churned users tend connect more frequently than not churned users according to (time_btw_sessions)

#### Categoricals features

Analysis of user gender

In [64]:
pd.crosstab(user_pd['max_gender'], user_pd['max_churn']).apply(lambda x: x/x.sum(), axis=1)

max_churn,0,1
max_gender,Unnamed: 1_level_1,Unnamed: 2_level_1
F,0.806122,0.193878
M,0.758929,0.241071


Churn rate seems to be higher for Males (24%) than Females (19%), so feature will be used for modelling

# Modeling

- Split the full dataset into train and test
- Test out several of the machine learning (LogisticRegression, RandomForest, LinearSVC)
- Evaluate the performance of various models, tuning parameters as necessary. 
- Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.
- Determine your winning model based on testset

In [50]:
# loading cleaned and aggregated data
df2 = spark.read.parquet("joined_df2.parquet")

In [51]:
# define label colum
df2 = df2.withColumnRenamed('max_churn','label')

df2.printSchema()

root
 |-- userId: string (nullable = true)
 |-- max_location: string (nullable = true)
 |-- max_gender: string (nullable = true)
 |-- max_hasPaid: integer (nullable = true)
 |-- max_hasDowngrade: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- dist_artist: long (nullable = true)
 |-- avg_session_length: double (nullable = true)
 |-- avg_session_songs: double (nullable = true)
 |-- total_session_length: double (nullable = true)
 |-- total_songs_played: long (nullable = true)
 |-- total_sessions: long (nullable = true)
 |-- time_btw_sessions: double (nullable = true)



#### Split data into validation and test set

- validation set is used for crossvalidation e gridsearch
- test set is used to evaluate the model perfomance on 'unseen' data

In [52]:
# split data into validation and test set
validation, test = df2.randomSplit([0.9, 0.1], seed=42)

#### define functions to model training and model evaluation

In [67]:
def fit_model(model, paramGrid):
    """
    Build a pipeline and fit a model using CrossValidator and hyperparam optimization.
    Numerical features are Scaled with StandardScaler
    Categorical feature gender is transformed with StringIndexer
    
    Args:
    model (spark.ml.classification): The classifier algorithm
    paramGrid (ParamGridBuilder): A grid of hyperparameters used to optimize the model
    
    return:
    fitted_model: A cross validated fitted model
    
    """
    
    #cv = CountVectorizer(inputCol="artist_list", outputCol="TF")
    #location_index = StringIndexer(inputCol='max_location', outputCol="location_index", handleInvalid='skip')

    gender_index = StringIndexer(inputCol='max_gender', outputCol="gender_index", handleInvalid='keep')

    #scaler
    #"max_hasDowngrade","max_hasPaid",
    numerical = ["dist_artist", "avg_session_length",
                 "avg_session_songs","total_session_length","total_songs_played",
                 "total_sessions","time_btw_sessions"]

    numfeatures = VectorAssembler(inputCols=numerical, outputCol='NumFeatures')
    scaler = StandardScaler(inputCol='NumFeatures', outputCol="ScaledNumFeatures")

    assembler = VectorAssembler(inputCols=[#"TF",
                                           #"location_index",
                                           "gender_index",
                                           "ScaledNumFeatures"
                                          ], 
                                outputCol='features')

    pipeline = Pipeline(stages=[gender_index,
                                numfeatures, 
                                scaler, 
                                assembler,
                                model])

    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=MulticlassClassificationEvaluator(metricName='f1'),
                              numFolds=3)

    fitted_model = crossval.fit(validation)
    
    return fitted_model

In [54]:
def evaluate_results(results):
    """
    Evaluate the predited values VS real values
    
    Returns:
    score (float): F1-Score
    """
    score = MulticlassClassificationEvaluator(metricName='f1').evaluate(results)
    return score

### Test several models

#### Logistic Regression

In [40]:
############################
# Logistic Regression
############################
lr=  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

lr_grid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .build()

In [41]:
model1 = fit_model(lr, lr_grid)

In [42]:
model1.avgMetrics

[0.8326583420788993, 0.6785281750633183]

In [43]:
test_results = model1.transform(test)
score = evaluate_results(model1.transform(test))

print('LogisticRegression f1-score: ', score)

LogisticRegression f1-score:  0.9


#### RandomForest

In [55]:
##########################
# RandomForestClassifier
##########################
rf = RandomForestClassifier()
rf_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth,[4,10,20])\
    .addGrid(rf.numTrees,[15,30])\
    .build()

In [56]:
model2 = fit_model(rf, rf_grid)


In [57]:
model2.avgMetrics

[0.8082284789073787,
 0.8105364781673958,
 0.8065433950052472,
 0.8240975328771643,
 0.8025585617028657,
 0.8240975328771643]

In [58]:
score = evaluate_results(model2.transform(test))
print('RandomForest f1-score: ', score)

RandomForest f1-score:  0.9423423423423425


#### LinearSVC

In [48]:
############
# LinearSVC
############
lsvc = LinearSVC()
lsvc_grid = ParamGridBuilder() \
    .addGrid(lsvc.maxIter,[10,20])\
    .addGrid(lsvc.regParam,[0.0,0.1])\
    .build()

model3 = fit_model(lsvc, lsvc_grid)

score = evaluate_results(model3.transform(test))

print('Linear SVC f1-score: ', score)

Linear SVC f1-score:  0.8526315789473685


In [49]:
model3.avgMetrics

[0.6673276801369108,
 0.6673276801369108,
 0.6673276801369108,
 0.6673276801369108]

# Conclusion

## **Results on small dataset:**

F1-Score
- LogisticRegression = 0.900
- RandomForest = 0.942
- LinearSVC = 0.852

RandomForest had the best performance on the small dataset, with F1-score = 0.862

The 3 models were run on the full dataset at Amazon EMR Cluster as well.


## **Results on full dataset, run on Amazon EMR Cluster:**

F1-Score
- LogisticRegression = 0.8051
- RandomForest = 0.879
- LinearSVC = 0.679

**Conclusions:**

LinearSVC had a poor perfomance on the full dataset

RandomForest had the best perfomance on the full dataset, with F1-score = 0.879. 

Based on the results above, I suggest to select RandomForest to deploy in production, as it has presented the best performance on the full dataset and has presented the lowest difference between the results on small and full dataset.

If computation costs or model explicability is a constraint, I would pick LogisticRegression to deploy in production.

**Future works**

Before a deployment in production, we could work on the followings steps:
- Run a new training increasing the quantity of hyperparameters and numFolds
- Run a new training with different algorithm, like boosting
- Analyse precision vs recall metrics and optmize the threshold based on the business goals.  Prioritize precision if the cost of false positive is high, or prioritize recall if the cost of false negative is high.