# Sparkify Project Workspace

I started by analyzing a subset (128MB) of the full dataset available (12GB). I used this subset to explore with Spark, pandas, and plotly before building my web app.

In [1]:
# import libraries
import re
import datetime
import pandas as pd
import numpy as np
import seaborn as sns
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objs as go
import plotly.express as px
import plotly.figure_factory as ff

# spark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, avg, sum, isnull, col, concat, desc, explode, lit, min, max, split, udf, \
                                  collect_list
from pyspark.sql.types import IntegerType, FloatType

# spark ML
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.storagelevel import StorageLevel
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import warnings
warnings.filterwarnings('ignore')

In [2]:
# make sure visuals are embeded inside the notebook
init_notebook_mode(connected=True)
%matplotlib inline

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. I will start by Loading and cleaning the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [3]:
# create a Spark session
spark = SparkSession.builder.appName("Sparkify App").getOrCreate()

In [4]:
# load the data from json file into the spark session context 
# the context need to be defined and created before using this function
def read_json(path):
    """Load the data from json file.
    
        Args:
            path (string): path to the json data
            
        Returns:
            Dataframe (df): Pyspark dataframe object for the json data 
        
    """
    df = spark.read.json(path)
    return df

df = read_json("mini_sparkify_event_data.json")
type(df)

pyspark.sql.dataframe.DataFrame

In [5]:
df.head() # check the first row

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [6]:
df.printSchema() # check the schema

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
print((df.count(), len(df.columns))) # number of rows and columns

(286500, 18)


As we can see, we have ```286500``` rows and ```18``` columns in this sample log. I will use ```plotly``` to perform ```EDA analysis``` on this dataset. For more information on ```plotly```, check this [link](https://plot.ly/python/).

# Exploratory Data Analysis

I perform EDA by on the small subset data and doing basic manipulations within `Spark, Pandas and Plotly`. I used `Pandas and plotly` to create interactive plots in this EDA analysis. 

### Define Churn

After finishing the preliminary analysis, I created a column `Churn` to use as the label for the ML model. I used the `Cancellation Confirmation` events to group customers into two churn groups for both paid and free users. 

### Explore Data
Once I've defined churn, I performed some EDA to observe the behavior for users who stayed vs users who churned. I started by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

## Check for Nulls

First, I will check the number of missing records in each column. I created a function called ```check_nulls``` which takes the ```Pyspark Dataframe``` as an object and uses ```plotly``` to create a bar chart for the distribution of missing records in each column.

In [68]:
def check_nulls(df):
    """Create a plot of the number of missing records in each column.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
            
        Returns:
            None  
    """
    df = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
    nulls_df = df.toPandas()
    missing_records = nulls_df.iloc[0]
    column_names = nulls_df.columns.tolist()
    nulls_df = pd.DataFrame({"missing_records": missing_records}).query("missing_records > 0")
    nulls = nulls_df.sort_values('missing_records', ascending=False)
    
    x = [ ' '.join(re.sub('([a-z])([A-Z])', r'\1 \2', name).split()).capitalize() for name in nulls.index.tolist()]
    y=nulls["missing_records"]
    fig = go.Figure([go.Bar(x=x, y=y, text=nulls["missing_records"],
                            marker_color='rgb(55, 83, 109)', textposition='auto', 
                            hovertext=["Missing Records in the `{}` column is {}".format(bar, y[i]) for i, bar in enumerate(x)],
                            hoverinfo="text")])
    
    fig.update_layout(
        title=go.layout.Title(
            text="Number of Missing Records/Column",
            x=0.5,
        ),
        xaxis=go.layout.XAxis(
            title=go.layout.xaxis.Title(
                text="Column Name",
            )
        ),
        yaxis=go.layout.YAxis(
            title=go.layout.yaxis.Title(
                text="Counts",
            )
        )
    )
    
    fig.show()
    

check_nulls(df)

**Observation**
- The top columns with missing records are `artist, length of song and song name`. The `first name, gender, last name, location, registration and user agent` have the same number of missing records.

Let's explore each categorical column to see the distribution of unique values in each one. I created a function called ```counts_plot``` that will take the ```Pyspark Dataframe``` as an input along with some formatting options to return a count plot for the unique values in the selected column.

## Authorization type

In [9]:
def counts_plot(df, col_name, x_axis='Authorization Type', y_axis='Counts', title='Authorization Type Counts', location=False):
    """Create a plot for a counts plot for a column.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
            x_axis (string): x-axis_label
            y_axis (string): y-axis label
            title (string): plot title
            location (bool): If True returns a count plot for the top five locations in the location column
            
        Returns:
            None
        
    """
    x = []
    y = []
    
    if location:
        df = df.select(col_name).toPandas()
        df.dropna(inplace=True)
        df["State"] = df[col_name].apply(lambda x: x.split(',')[1])
        x = ["California", "New York", "Texas", "Massachusetts", 'Florida'] # df["State"].value_counts().index.tolist()[:5]
        y = df["State"].value_counts().tolist()[:5]
    else:
        df = df.select(col_name).groupby(col_name).agg(count(col_name).alias('counts'))
        df = df.toPandas().sort_values('counts', ascending=False)
        y = df["counts"].tolist()
        
        try:
            x = [' '.join(re.sub('([a-z])([A-Z])', r'\1 \2', name).split()).capitalize() for name in df[col_name].tolist()]
        except TypeError:
            x = ["Male" if name=="M" else "Female" for name in df[col_name].unique().tolist()]        
        
    fig = go.Figure([go.Bar(x=x, y=y, text=y, marker_color='rgb(55, 83, 109)', textposition='auto', 
                           hovertext=["Total counts of `{}` is {}".format(x[i], y[i]) for i,bar in enumerate(x)],
                            hoverinfo="text")])
    fig.update_layout(
        title=go.layout.Title(
            text=title,
            x=0.5,
        ),
        xaxis=go.layout.XAxis(
            title=go.layout.xaxis.Title(
                text=x_axis,
            )
        ),
        yaxis=go.layout.YAxis(
            title=go.layout.yaxis.Title(
                text=y_axis,
            )
        )
    )
    
    fig.show()
    
counts_plot(df, 'auth')

**Observation**
- Based on this sample data, most users are registred and have an account on the website. Less than `3 %` of these  users logout of their Sparkify app.

## Top 5 States by Customer Counts

In [10]:
counts_plot(df, 'location',  x_axis='States', y_axis='Counts', title='Top 5 States', location=True)

**Observation**
- Most users are subscribed from California state. New York and Texas share a similar user base size. 

## Page Type Distribution

In [11]:
counts_plot(df, 'page',  x_axis='Page Type', y_axis='Counts', title='Page Distribution')

**Observation**
- As expected, most users interact more with `Next song` page, `79 % of all recorded actions`. `Cancellation and downgrade` requests are less than other events in this log. This suggests imbalanced data when trying to classify customers into churn groups (churned or not).

## Gender Distribution

In [12]:
counts_plot(df, 'gender',  x_axis='Gender Type', y_axis='Counts', title='Gender Distribution')

**Observation**
- We have a similar distribution of customers based on their gender.

## Number of New Customers Each Month

Let's look at the number of new registrations we receive each month. I converted the registration column to a `datetime` column and aggregated the data to group new registred customers by month to generate a time plot for the number of new customers Sparkify have each month.

In [13]:
def new_customers_chart(df):
    """Create a timeline plot for the number of new customers enrolled each month.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
        Returns:
            None
        
    """
        
    registrations = df.select("registration", "userId").toPandas().dropna()
    registrations["date"] = pd.to_datetime([datetime.datetime.fromtimestamp(date/1000) for date in registrations.registration])
    series = registrations.resample('M', on='date').count()["registration"]
    fig = go.Figure()
    x = series.index.tolist()
    y = series.tolist()
    fig.add_trace(go.Scatter(x=x, y=y, line = dict(color='firebrick', width=3),
                             hovertext=["Total number of new customers at `{}` is {}".format(x[i], y[i]) for i,bar in enumerate(x)],
                             hoverinfo="text", mode='lines+markers', name='lines+markers', 
                             fill='toself', fillcolor='rgba(231,107,243,0.2)'))
    fig.update_layout(
            title=go.layout.Title(
                text="Number of New Customers Each Month",
                x=0.5,
            ),
            yaxis=go.layout.YAxis(
                title=go.layout.yaxis.Title(
                    text="Counts",
                )
            )
        )
    fig.show()
    
new_customers_chart(df)

**Observation**
- There was a spike in the number of new customers Sparkify had in `October 2018 (164,834 customers)`. The trend observed here may suggest an increase in the number of new customers in the period between `June and October (summer time)`.

## Hourly Activity on Sparkify

Let's look at the number of active customers at each on the Sparkify app. I converted the `ts` into a `datetime` and grouped users by the hour to generate a timeline of the number of active users at each hour. 

In [14]:
def hourly_log_chart(df):
    """Create a timeline plot for the number of new customers enrolled each month.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
        Returns:
            None
        
    """
    hourly_df = df.select("userId", "page" ,"ts").toPandas()
    hourly_df["hour"] = hourly_df["ts"].apply((lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour))
    hourly_df =  hourly_df[hourly_df["page"] == "NextSong"].groupby("hour").count()
    fig = go.Figure()
    x = hourly_df.index.tolist()
    y = hourly_df.userId
    
    fig.add_trace(go.Scatter(x=x, y=y, line = dict(color='firebrick', width=3),
                             hovertext=["Number of Customers listening to songs at `{}:00` is {}".format(x[i], y[i]) for i,bar in enumerate(x)],
                             hoverinfo="text", mode='lines+markers', name='lines+markers', 
                             fill='toself', fillcolor='rgba(231,107,243,0.2)'))
    fig.update_layout(
            title=go.layout.Title(
                text="Hourly Log of Active Customers",
                x=0.5,
            ),
            xaxis=go.layout.XAxis(
                title=go.layout.xaxis.Title(
                    text="Hour",
                )
            ),
            yaxis=go.layout.YAxis(
                title=go.layout.yaxis.Title(
                    text="Number of Customers",
                )
            )
        )
    fig.show()
    
hourly_log_chart(df)

**Observation**
- The activity on Sparkify increases from 12:00 am to reach its max by 9:00 am. From 9:00 am to 13:00 pm, there is a small drop in the number of active users. After 13:00 pm, The number of active users drops significantly till 11:00 pm.

## Cancellation requests

In [15]:
# number of user submitted cancellation requests
users_churned = df.where((col("page") == "Cancellation Confirmation")).select("userId").distinct().count()
# total number of users
all_users = df.select("userId").distinct().count()
print("Number of users submitted cancellation requests is {} out of {} customers. The churn rate is {:.2f}%" \
      .format(users_churned, all_users, 100*users_churned/all_users ))

Number of users submitted cancellation requests is 52 out of 226 customers. The churn rate is 23.01%


## Clean the data

Next, I will create two DataFrames; `df_churn and df_clean`. The `df_churn` will be created using the `churn_data` function and will contain data related to whether the user has churned or not. The `df_clean` will have data on users with the following columns only; `artist, auth, gender, length, level, location, page, song, ts, userId`. The `df_churn` will be joined with the df_clean inside the `clean_data` function to return a clean data with relevant columns only in addition to the `Churned` column which has data on whether a user churned or not. The `Churned` column will be used as a label in the ML part.

In [16]:
def churn_data(df):
    """Return a DataFrame where users are categorized by their churn group.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
        Returns:
            DataFrame (df): Pyspark Dataframe with churn classfication for each user
    """
    # create a DataFrame with churn users
    df_churn = df.groupby('userId').agg(collect_list('auth').alias("auths"))
    churned = udf(lambda x: 'Cancelled' in x) # find if user cancelled or not
    df_churn = df_churn.withColumn("Churned", churned(df_churn.auths)) # create a column with True or False for Churn users
    df_churn = df_churn.drop('auths') # drop auths - redundant after creating the Churned column
    
    return df_churn

df_churn = churn_data(df)

In [17]:
def clean_data(df, columns=['artist','auth','gender','length','level','location','page','song','ts','userId']):
    """Return a clean DataFrame with the selected columns.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
            columns (list): list of selected columns
        Returns:
            DataFrame (df): clean Pyspark Dataframe
    """
    # keep selected columns only
    df_clean = df.select(columns)
    df_churn = churn_data(df_clean) # get churn DataFrame
    # create a new DataFrame with labels for churn users 
    df_label = df_churn.join(df_clean,'userId')
    
    return df_label

df_clean = clean_data(df)
df_clean.show(5) 

+------+-------+--------------------+---------+------+---------+-----+--------------------+---------+--------------------+-------------+
|userId|Churned|              artist|     auth|gender|   length|level|            location|     page|                song|           ts|
+------+-------+--------------------+---------+------+---------+-----+--------------------+---------+--------------------+-------------+
|100010|  false|Sleeping With Sirens|Logged In|     F|202.97098| free|Bridgeport-Stamfo...| NextSong|Captain Tyin Knot...|1539003534000|
|100010|  false|Francesca Battist...|Logged In|     F|196.54485| free|Bridgeport-Stamfo...| NextSong|Beautiful_ Beauti...|1539003736000|
|100010|  false|              Brutha|Logged In|     F|263.13098| free|Bridgeport-Stamfo...| NextSong|          She's Gone|1539003932000|
|100010|  false|                null|Logged In|     F|     null| free|Bridgeport-Stamfo...|Thumbs Up|                null|1539003933000|
|100010|  false|         Josh Ritter|Logg

## Average Days for Churn Groups

Let's inpect the average number of days for each churn group.

In [18]:
df_clean.groupby("Churned").avg("length").collect()

[Row(Churned='false', avg(length)=249.20913538880816),
 Row(Churned='true', avg(length)=248.63279564406218)]

## Number of Customers in Each Churn Group

We have `174` customers who stayed loyal and `52` who decided to part away.

In [19]:
df_clean.select(["userId","Churned"]).distinct().groupBy("Churned").count().collect()

[Row(Churned='false', count=174), Row(Churned='true', count=52)]

## Number of Thumbs Up

Let's inspect the distribution of thumps up submitted by the user base of Sparkify.

In [20]:
thumbsUp = df_clean.where(df_clean.page=='Thumbs Up').groupby("userId").agg(count(col('page')).alias('ThumbsUp')).orderBy('userId')
thumbsUp.show()

+------+--------+
|userId|ThumbsUp|
+------+--------+
|    10|      37|
|   100|     148|
|100001|       8|
|100002|       5|
|100003|       3|
|100004|      35|
|100005|       7|
|100006|       2|
|100007|      19|
|100008|      37|
|100009|      23|
|100010|      17|
|100012|      18|
|100013|      39|
|100014|      17|
|100015|      35|
|100016|      25|
|100017|       2|
|100018|      46|
|100019|       1|
+------+--------+
only showing top 20 rows



In [21]:
def histogram(df, column, title="Histogram - Number of Thumps Up", x_axis="Number of Thumbs Up", y_axis="Frequency"):
    """Create a histogram for a column with the specified annotations.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
            column (string): the selected numerical column name
            title (string): plot title
            x_axis (string): x-axis_label
            y_axis (string): y-axis label
        Returns:
            None
    """
    series = df.select(column).toPandas()
    fig = go.Figure(data=[go.Histogram(x = series[column], marker_color='rgb(55, 83, 109)')])
    fig.update_layout(
            title=go.layout.Title(
                text = title,
                x = 0.5,
            ),
            xaxis = go.layout.XAxis(
                title = go.layout.xaxis.Title(
                    text = x_axis,
                )
            ),
            yaxis = go.layout.YAxis(
                title = go.layout.yaxis.Title(
                    text = y_axis,
                )
            )
        )
    fig.show()
    
histogram(thumbsUp, "ThumbsUp")

**Observation**
- The histogram for the number of thumps up users submit is highly skewed to the right.

## Number of Thumbs Down

Let's check the number of thumbs down submitted by the users.

In [22]:
thumbsDown = df_clean.where(df_clean.page=='Thumbs Down').groupby("userId").agg(count(col('page')).alias('ThumbsDown')).orderBy('userId')
thumbsDown.show()

+------+----------+
|userId|ThumbsDown|
+------+----------+
|    10|         4|
|   100|        27|
|100001|         2|
|100004|        11|
|100005|         3|
|100006|         2|
|100007|         6|
|100008|         6|
|100009|         8|
|100010|         5|
|100011|         1|
|100012|         9|
|100013|        15|
|100014|         3|
|100015|         8|
|100016|         5|
|100017|         1|
|100018|         9|
|100019|         1|
|100021|         5|
+------+----------+
only showing top 20 rows



In [23]:
histogram(thumbsDown, "ThumbsDown", title="Histogram - Number of Thumps Downs", x_axis="Number of Thumbs Down")

**Observation**
- The histogram for the number of thumps down users submit is highly skewed to the right but at much lower rates than thumps up. Check the range of both histograms.

Let's join all this thumps down/up data into one DataFrame.

In [24]:
allThumbs = thumbsUp.join(thumbsDown,'userId')

## Number of Songs Played

Let's inspect the distribution of songs played by the user base after removing nulls.

In [25]:
songsPlayed = df_clean.where(col('song')!='null').groupby("userId").agg(count(col('song')).alias('SongsPlayed')).orderBy('userId')
songsPlayed.show()

+------+-----------+
|userId|SongsPlayed|
+------+-----------+
|    10|        673|
|   100|       2682|
|100001|        133|
|100002|        195|
|100003|         51|
|100004|        942|
|100005|        154|
|100006|         26|
|100007|        423|
|100008|        772|
|100009|        518|
|100010|        275|
|100011|         11|
|100012|        476|
|100013|       1131|
|100014|        257|
|100015|        800|
|100016|        530|
|100017|         52|
|100018|       1002|
+------+-----------+
only showing top 20 rows



In [26]:
histogram(songsPlayed, "SongsPlayed", title="Histogram - Number of Songs Played", x_axis="Number of Songs Played")

**Observation**
- The histogram is highly skewed to the right. Most users listen to 500 to 1000 songs on average during their subscription. 

## Number of Days Users Stayed Loyal to Sparkify 

Let's inspect the distribution of the number of days users stayed loyal to Sparkify.

In [27]:
days = df_clean.groupby('userId').agg(((max(col('ts')) - min(col('ts')))/86400000).alias("Days"))
days.show()

+------+-------------------+
|userId|               Days|
+------+-------------------+
|100010|  44.21780092592593|
|200002| 45.496805555555554|
|   125|0.02053240740740741|
|   124| 59.996944444444445|
|    51| 15.779398148148148|
|     7| 50.784050925925925|
|    15|  54.77318287037037|
|    54|  42.79719907407407|
|   155|  25.82783564814815|
|100014| 41.244363425925926|
|   132|  50.49740740740741|
|   154| 24.986458333333335|
|   101| 15.861481481481482|
|    11| 53.241585648148146|
|   138|  56.07674768518518|
|300017|  59.11390046296296|
|100021| 45.457256944444445|
|    29|  43.32092592592593|
|    69|  50.98648148148148|
|   112|  56.87869212962963|
+------+-------------------+
only showing top 20 rows



In [28]:
histogram(days, "Days", title="Histogram - Number of Days Users Stayed Loyal to Sparkify", x_axis="Number of Days")

**Observation**
- The histogram is highly skewed to the left. Most users stayed loyal for a period betweeb 50 to 60 days on average during their subscription to Sparkify. 

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

After exploring the sample DataFrame, let's start creating a new DataFrame called `df_features` which will contain reengineered features for the ML modeling.  The new DataFrame `df_features` will contain the following columns:
- userId: unique Id for each user.
- Churned: whether the user is still a customer or not. If the user is still a customer then Churned is False. If not, then True.
- SongsPlayed: the number of Songs played by a user.
- ThumbsUp: the number of times the user voted a song up.
- ThumbsUp: the number of times the user voted a song down.
- Days: the number of days since the user started using Sparkify.
- upPerSong: the ratio of songs voted up by a user.
- downPerSong: the ratio of songs voted down by a user.
- songsPerHour: the ratio between the total number of songs and the played hours since the user subscribed to Sparkify.

Let's start by creating the first four columns. 

In [29]:
# join the df_churn data with songsPlayed
df_features = df_churn.join(songsPlayed,'userId') 

# join the df_features data with allThumbs which contains data on Thumps up/down
df_features = df_features.join(allThumbs,'userId')

# join the df_features data with days
df_features = df_features.join(days, "userId")

# check the df_features
df_features.show()

+------+-------+-----------+--------+----------+------------------+
|userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|
+------+-------+-----------+--------+----------+------------------+
|100010|  false|        275|      17|         5| 44.21780092592593|
|200002|  false|        387|      21|         6|45.496805555555554|
|   124|  false|       4079|     171|        41|59.996944444444445|
|    51|   true|       2111|     100|        21|15.779398148148148|
|     7|  false|        150|       7|         1|50.784050925925925|
|    15|  false|       1914|      81|        14| 54.77318287037037|
|    54|   true|       2841|     163|        29| 42.79719907407407|
|   155|  false|        820|      58|         3| 25.82783564814815|
|100014|   true|        257|      17|         3|41.244363425925926|
|   132|  false|       1928|      96|        17| 50.49740740740741|
|   101|   true|       1797|      86|        16|15.861481481481482|
|    11|  false|        647|      40|         9|

In [30]:
# create user defined functions to create the last three columns
# upPerSong is number of thumbs up divided by the number of songs
upPerSong = udf(lambda numUp, songs: numUp/songs, FloatType())

# downPerSong is number of thumbs down divided by the number of songs
downPerSong = udf(lambda numDown, songs: numDown/songs, FloatType())

# songsPerHour is number of songs divided by the number of hours where the user stayed active in the log
songsPerHour = udf(lambda numSongs, numDays: numSongs/(numDays*24), FloatType())

# create the df_features data frame
df_features = df_features.drop("UpPerSong", "DownPerSong", "SongsPerHour")
df_features = df_features.withColumn("UpPerSong", upPerSong(df_features.ThumbsUp, df_features.SongsPlayed))
df_features = df_features.withColumn("DownPerSong", downPerSong(df_features.ThumbsDown, df_features.SongsPlayed))
df_features = df_features.withColumn("SongsPerHour", songsPerHour(df_features.SongsPlayed, df_features.Days))

In [31]:
df_features.show() # check the created df_features DataFrame

+------+-------+-----------+--------+----------+------------------+-----------+------------+------------+
|userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong| DownPerSong|SongsPerHour|
+------+-------+-----------+--------+----------+------------------+-----------+------------+------------+
|100010|  false|        275|      17|         5| 44.21780092592593|0.061818182| 0.018181818|  0.25913393|
|200002|  false|        387|      21|         6|45.496805555555554|0.054263566| 0.015503876|  0.35442048|
|   124|  false|       4079|     171|        41|59.996944444444445| 0.04192204| 0.010051483|   2.8327832|
|    51|   true|       2111|     100|        21|15.779398148148148|0.047370914| 0.009947892|   5.5742517|
|     7|  false|        150|       7|         1|50.784050925925925|0.046666667| 0.006666667| 0.123070136|
|    15|  false|       1914|      81|        14| 54.77318287037037| 0.04231975|0.0073145246|   1.4560045|
|    54|   true|       2841|     163|        2

## Average Songs Per Hour for Churn Groups

In [32]:
# check the average songs per hour for the churn groups
df_features.select("SongsPerHour", "Churned").groupby("Churned").agg(avg(col('SongsPerHour'))).show()

+-------+------------------+
|Churned| avg(SongsPerHour)|
+-------+------------------+
|  false|1.3289544926175187|
|   true| 2.415751484163264|
+-------+------------------+



**Observation**
- Churned users have higher SongsPerHour rate than the other group. This suggests that this new feature can be used in predicting whether the user can stay or leave the service.

# Heat Map

Now let's check the correlation between the numerical features in the DataFrame using a heat map.

In [33]:
def heat_map(df):
    """Create a heatmap for the numerical features of the DataFrame.
    
        Args:
            df (DataFrame): Pyspark Dataframe object
        Returns:
            None
    """
    
    df_heat = df.select('SongsPerHour','SongsPlayed','Days','UpPerSong','DownPerSong').toPandas()
    df_heat = df_heat.corr(method="pearson")
    z= df_heat.values
    x = df_heat.columns.tolist()
    font_colors = ['white']
    colorscale = [[0, 'navy'], [1, 'plum']]
    fig = ff.create_annotated_heatmap(z = z, x = x, y = x, annotation_text = np.around(z, decimals=2), 
                                      colorscale = colorscale, font_colors = font_colors)
    fig.update_layout(
            title=go.layout.Title(
                text = "Heat Map | Numerical Features",
                x = 0.5,
            )
        )
    fig.show()
    
heat_map(df_features)

**Observation**
- There is a strong negative correlation (`-0.49`) between the number of days users stay subscribed to Sparkify, and the number of songs users listen to per hour
- There is a positive correlation (`0.44`) between the number of days users remain subscribed to Sparkify, and the number of songs users listen to.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

### Selecting, Scaling and Transforming The Feature Dataset 

- Logicistic regression is used as a starting point for the ML modeling. It's a good candidate for binary classification problems.
- The numerical features will be used to build the ML model.
- The `Churned` column will be converted to a label column.

In [34]:
numerical_cols = ["SongsPlayed", "UpPerSong", "DownPerSong", "Days", "SongsPerHour"]
assembler = VectorAssembler(inputCols = numerical_cols, outputCol = "FeatureVector")
df_features = assembler.transform(df_features)
scaler = StandardScaler(inputCol="FeatureVector", outputCol="ScaledFeatures", withStd=True)
scalerModel = scaler.fit(df_features)
df_features = scalerModel.transform(df_features)
df_features.show()

+------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+
|userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong| DownPerSong|SongsPerHour|       FeatureVector|      ScaledFeatures|
+------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+
|100010|  false|        275|      17|         5| 44.21780092592593|0.061818182| 0.018181818|  0.25913393|[275.0,0.06181818...|[0.24639992057147...|
|200002|  false|        387|      21|         6|45.496805555555554|0.054263566| 0.015503876|  0.35442048|[387.0,0.05426356...|[0.34675188822240...|
|   124|  false|       4079|     171|        41|59.996944444444445| 0.04192204| 0.010051483|   2.8327832|[4079.0,0.0419220...|[3.65478282185840...|
|    51|   true|       2111|     100|        21|15.779398148148148|0.047370914| 0.009947892|   5.5742517|[2111.0

### Creating Labels for Churn Groups

In [35]:
convertToInt = udf(lambda x: 1 if x=="true" else 0, IntegerType())
df_features = df_features.withColumn('label', convertToInt(df_features.Churned))
df_features.show()

+------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+-----+
|userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|              Days|  UpPerSong| DownPerSong|SongsPerHour|       FeatureVector|      ScaledFeatures|label|
+------+-------+-----------+--------+----------+------------------+-----------+------------+------------+--------------------+--------------------+-----+
|100010|  false|        275|      17|         5| 44.21780092592593|0.061818182| 0.018181818|  0.25913393|[275.0,0.06181818...|[0.24639992057147...|    0|
|200002|  false|        387|      21|         6|45.496805555555554|0.054263566| 0.015503876|  0.35442048|[387.0,0.05426356...|[0.34675188822240...|    0|
|   124|  false|       4079|     171|        41|59.996944444444445| 0.04192204| 0.010051483|   2.8327832|[4079.0,0.0419220...|[3.65478282185840...|    0|
|    51|   true|       2111|     100|        21|15.779398148148148|0.0473709

### Splitting the Data to Training and Testing Datasets

The `df_features` DataFrame is divided to training, validation and testing datasets. This will help preventing the ML model from overfitting or underfitting when learning from the supplied dataset. 

In [36]:
train, test = df_features.randomSplit([0.8, 0.2], seed=42)
train, validation = train.randomSplit([0.8, 0.2], seed=42)

### Training The Logistic Regression Model

As mentioned Logistic regression is used as a starting point for the ML modeling. 

In [37]:
lr = LogisticRegression(featuresCol = 'FeatureVector', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

Let's check the training summary results

In [38]:
training_summary = lrModel.summary
objective_history = training_summary.objectiveHistory

print("objective History:")
for obj in objective_history:
    print(obj)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
training_summary.roc.show()
print("Area under the ROC: " + str(training_summary.areaUnderROC))

objective History:
0.5218621245387386
0.484219381160328
0.4383255965606231
0.40121483945570474
0.39719928524762355
0.3800484935954955
0.36962349709656456
0.3541276846797805
0.3459847651158902
0.34295067376734445
0.33580669284130205
+--------------------+-------------------+
|                 FPR|                TPR|
+--------------------+-------------------+
|                 0.0|                0.0|
|0.007874015748031496|                0.0|
|0.007874015748031496|0.02857142857142857|
|0.007874015748031496|0.05714285714285714|
|0.007874015748031496|0.08571428571428572|
|0.015748031496062992|0.08571428571428572|
|0.015748031496062992|0.11428571428571428|
|0.015748031496062992|0.14285714285714285|
|0.015748031496062992|0.17142857142857143|
|0.015748031496062992|                0.2|
|0.015748031496062992|0.22857142857142856|
|0.023622047244094488|0.22857142857142856|
|0.023622047244094488| 0.2571428571428571|
|0.023622047244094488| 0.2857142857142857|
|0.023622047244094488| 0.314285714285

**Observations**
- For training data, AUC was `0.89`.
- The best cutoff value is `0.38` and the best F-score is `0.72`.

In [39]:
f_measure = training_summary.fMeasureByThreshold
max_f_measure = f_measure.groupBy().max('F-Measure').select('max(F-Measure)').head()
best_threshold = f_measure.where(f_measure['F-Measure'] == max_f_measure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
f_measure.show()

+------------------+-------------------+
|         threshold|          F-Measure|
+------------------+-------------------+
|0.8655645162024803|                0.0|
|0.8603991487453484|0.05405405405405405|
|0.8536839890357993|0.10526315789473684|
|0.8491077815038812|0.15384615384615383|
| 0.835862740538495|               0.15|
|0.8322794248363424|0.19512195121951217|
|0.8065574966643975|0.23809523809523808|
|0.7787801655141317| 0.2790697674418604|
|0.7503131673288344| 0.3181818181818182|
|0.7305390950035189| 0.3555555555555555|
|0.6858882253276974|0.34782608695652173|
|0.6734502434126921| 0.3829787234042553|
|0.6655922013855465|0.41666666666666663|
|0.6432105555312098|0.44897959183673464|
|0.6405194138258704|0.48000000000000004|
|0.6360500392663839| 0.5098039215686275|
|0.6290250299091321| 0.5384615384615384|
|0.5981343652506601| 0.5660377358490566|
|0.5927993574513226| 0.5555555555555555|
|0.5854108624014034| 0.5818181818181818|
+------------------+-------------------+
only showing top

In [40]:
print(best_threshold)
print(max_f_measure)

0.3818777091234905
Row(max(F-Measure)=0.7222222222222223)


In [41]:
pr = training_summary.pr
pr.show()

+-------------------+------------------+
|             recall|         precision|
+-------------------+------------------+
|                0.0|               0.0|
|                0.0|               0.0|
|0.02857142857142857|               0.5|
|0.05714285714285714|0.6666666666666666|
|0.08571428571428572|              0.75|
|0.08571428571428572|               0.6|
|0.11428571428571428|0.6666666666666666|
|0.14285714285714285|0.7142857142857143|
|0.17142857142857143|              0.75|
|                0.2|0.7777777777777778|
|0.22857142857142856|               0.8|
|0.22857142857142856|0.7272727272727273|
| 0.2571428571428571|              0.75|
| 0.2857142857142857|0.7692307692307693|
| 0.3142857142857143|0.7857142857142857|
|0.34285714285714286|               0.8|
|0.37142857142857144|            0.8125|
|                0.4|0.8235294117647058|
|0.42857142857142855|0.8333333333333334|
|0.42857142857142855|0.7894736842105263|
+-------------------+------------------+
only showing top

### Make Prediction on The Validation Dataset

In [42]:
predictions_v = lrModel.transform(validation)
predictions_v.show(5)

+------+-------+-----------+--------+----------+-------------------+-----------+------------+------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|userId|Churned|SongsPlayed|ThumbsUp|ThumbsDown|               Days|  UpPerSong| DownPerSong|SongsPerHour|       FeatureVector|      ScaledFeatures|label|       rawPrediction|         probability|prediction|
+------+-------+-----------+--------+----------+-------------------+-----------+------------+------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|     7|  false|        150|       7|         1| 50.784050925925925|0.046666667| 0.006666667| 0.123070136|[150.0,0.04666666...|[0.13439995667535...|    0|[3.26037000874581...|[0.96304396167816...|       0.0|
|    54|   true|       2841|     163|        29|  42.79719907407407|0.057374164| 0.010207674|    2.765952|[2841.0,0.0573741...|[2.54553517943116...|    1|[0.82277671419

In [43]:
accuracy_v = predictions_v.filter(predictions_v.label == predictions_v.prediction).count()
predictions_v_ct = predictions_v.count()

In [46]:
print("The accuracy on the validation dataset is {:.2f}".format(accuracy_v/predictions_v_ct))

The accuracy on the validation dataset is 0.89


**Observations**
- For the validation dataset, The accuracy was `0.89` and AUC was 1.

In [47]:
evaluator = BinaryClassificationEvaluator()

In [48]:
print('Area under the ROC', evaluator.evaluate(predictions_v))

Area under the ROC 1.0


### Fine Tuning Hyper-parameters

In [49]:
lr.setThreshold(best_threshold)
param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).build()
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=param_grid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)
cv_model = crossval.fit(train)
predictions_hp = cv_model.transform(validation)

In [50]:
accuracy_hp = predictions_hp.filter(predictions_hp.label == predictions_hp.prediction).count()
predictions_hp_ct = predictions_hp.count()

In [51]:
print("The accuracy on the validation dataset is {:.2f}".format(accuracy_hp/predictions_hp_ct))

The accuracy on the validation dataset is 0.89


In [52]:
print('Area under the ROC', evaluator.evaluate(predictions_hp))

Area under the ROC 0.9285714285714286


**Observations**
- For validation dataset, The accuracy was `0.89` and AUC was `0.92`.

### Apply the Fine Tuned Model on the Test Dataset

In [53]:
predictions_hp_test = cv_model.transform(test)

In [54]:
accuracy_test_hp = predictions_hp_test.filter(predictions_hp_test.label == predictions_hp_test.prediction).count()
predictions_test_hp_ct = predictions_hp_test.count()

In [55]:
print("The accuracy on the testing dataset is {:.2f}".format(accuracy_test_hp/predictions_test_hp_ct))

The accuracy on the testing dataset is 0.84


In [56]:
print('Area under the ROC', evaluator.evaluate(predictions_hp_test))

Area under the ROC 0.9333333333333335


**Observations**
- For test dataset, accuracy was `0.84` and AUC was `0.93`.

### Applying The Default Model on The Test Dataset

In [57]:
predictions_d = lrModel.transform(test)
accuracy_d = predictions_d.filter(predictions_d.label == predictions_d.prediction).count()
predictions_test_d_ct = predictions_d.count()

In [58]:
print("The accuracy on the testing dataset is {:.2f}".format(accuracy_d/predictions_test_d_ct))

The accuracy on the testing dataset is 0.81


In [59]:
print('Area under the ROC', evaluator.evaluate(predictions_d))

Area under the ROC 0.8904761904761906


**Observations**
- When applying the default model on the testing dataset, the accuracy was `0.81` and AUC was `0.89`.

## Observations

**EDA Analysis**

- The top columns with missing records are artist, length of song and song name. The first name, gender, last name, location, registration and user agent have the same number of missing records.
- Based on this sample data, most users are registred and have an account on the website. Less than `3 %` of these users logout of their Sparkify app.
- Most users are subscribed from California state. New York and Texas share a similar user base size. 
- As expected, most users interact more with Next song page, `79 %` of all recorded actions. Cancellation and downgrade requests are less than other customer groups in this log. This suggests imbalanced data when trying to classify customers into churn groups (churned or not).
- We have a similar distribution of customers based on their gender.
- There was a spike in the number of new customers Sparkify had in `October 2018 (164,834 customers)`. The trend observed here may suggest an increase in the number of new customers in the period between `June and October (summer time)`.
- The activity on Sparkify increases from `12:00 am to reach its max by 9:00 am`. From `9:00 am to 13:00 pm`, there is a small drop in the number of active users. After `13:00 pm`, The number of active users drops significantly till `11:00 pm`.
- Number of users submitted cancellation requests is `52` out of `226` customers. The churn rate is `23.01 %`.
- The histogram for the number of thumps up users submit is highly skewed to the right.
- The histogram for the number of thumps down users submit is highly skewed to the right but at much lower rates than thumps up. Check the range of both histograms.
- The histogram the number of played songs is highly skewed to the right. Most users listen to `500 to 1000` songs on average during their subscription. 
- The histogram for  the number of days user stayed loyal to Spakify is highly skewed to the left. Most users stayed loyal for a period betweeb `50 to 60 days` on average during their subscription to Sparkify. 
- There is a strong negative correlation `(-0.49)` between the number of days users stay subscribed to Sparkify, and the number of songs users listen to per hour
- There is a positive correlation `(0.44)` between the number of days users remain subscribed to Sparkify, and the number of songs users listen to.

**Modeling**
- Logicistic regression was used as a starting point for the ML modeling. It's a good candidate for binary classification problems.
    - For training data, AUC was `0.89`.
    - For training data, the best cutoff value is `0.38` and the best F-score is `0.72`.
    - For the validation dataset, The accuracy was `0.89` and AUC was 1.
    - For the testing dataset,  The accuracy was `0.81` and AUC was `0.89`
- When fine tuning the hyper-parameters:
    - For validation dataset, The accuracy was `0.89` and AUC was `0.92`.
    - For test dataset, accuracy was `0.84` and AUC was `0.93`.
    
 This means that the fine tuned model performed slightly better than the default model.

The results were expored to csv files to build the web application. Please check the README.md File

## Saving The ML Results

In [69]:
# these files will be used to create the web dashboard app
# save original data and exploratory data to csv
df.toPandas().to_csv('./data/original_data.csv')
df_churn.toPandas().to_csv('./data/churn_data.csv')
df_clean.toPandas().to_csv('./data/clean_data.csv')
df_features.toPandas().to_csv('./data/features_data.csv')

# save training, testing and validation data to csv
train.toPandas().to_csv('./data/train_data.csv')
test.toPandas().to_csv('./data/test_data.csv')
validation.toPandas().to_csv('./data/validation_data.csv')

# save model results to csv
training_summary.roc.toPandas().to_csv('./data/roc_data.csv')
f_measure.toPandas().to_csv('./data/f_measure_data.csv')
pr.toPandas().to_csv('./data/pr_data.csv')

In [73]:
days.toPandas().to_csv('./data/days_data.csv')
songsPlayed.toPandas().to_csv('./data/songsPlayed_data.csv')
thumbsUp.toPandas().to_csv('./data/thumbsUp_data.csv')
thumbsDown.toPandas().to_csv('./data/thumbsDown_data.csv')