# Sparkify Project Workspace

This workspace contains a tiny subset (128MB) of the full dataset available (12GB). This notebook is used to explore a smaller subset with Spark before running on the cloud.

In [1]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark import SparkFiles
from pyspark.sql.functions import avg, col, concat, count, desc, \
asc, explode, lit, min, max, split, stddev, udf, isnan, when, rank, \
log, sqrt, cbrt, exp
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, \
RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, \
PCA, RegexTokenizer, Tokenizer, StandardScaler, StopWordsRemover, \
StringIndexer, VectorAssembler, MaxAbsScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from time import time
import re
import numpy as np
import scipy
import pandas as pd
import datetime
import matplotlib.pyplot as plt
from pandas.plotting import scatter_matrix
import seaborn as sns
import random
%matplotlib inline
random.seed(42)

In [2]:
# create a Spark session
spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify") \
    .getOrCreate()

## 1. Load and Clean Dataset

### 1.1. Load data

In [3]:
# Load from local source as a pilot study
path = 'mini_sparkify_event_data.json'
df = spark.read.json(path)
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [4]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



### 1.2. Check for missing values

In [5]:
# Examine the number of missing values in each column
print(f"Total number of rows in the dataset: {df.count()}.\n")
missing_count_total = 0

for coln in df.columns:
    missing_count = df.filter((isnan(df[coln])) | (df[coln].isNull()) | (df[coln] == "")).count()
    
    if missing_count > 0:
        nan_count = df.filter(isnan(df[coln])).count()
        null_count = df.filter(df[coln].isNull()).count()
        empty_str = df.filter(df[coln] == "").count()
        print(f"{coln}: {missing_count} missing values. \
        {nan_count} NaN's; {null_count} Null's; {empty_str} empty strings.")
        missing_count_total += missing_count
        
if missing_count_total == 0:
    print("No missing values.")

Total number of rows in the dataset: 286500.

artist: 58392 missing values.         0 NaN's; 58392 Null's; 0 empty strings.
firstName: 8346 missing values.         0 NaN's; 8346 Null's; 0 empty strings.
gender: 8346 missing values.         0 NaN's; 8346 Null's; 0 empty strings.
lastName: 8346 missing values.         0 NaN's; 8346 Null's; 0 empty strings.
length: 58392 missing values.         0 NaN's; 58392 Null's; 0 empty strings.
location: 8346 missing values.         0 NaN's; 8346 Null's; 0 empty strings.
registration: 8346 missing values.         0 NaN's; 8346 Null's; 0 empty strings.
song: 58392 missing values.         0 NaN's; 58392 Null's; 0 empty strings.
userAgent: 8346 missing values.         0 NaN's; 8346 Null's; 0 empty strings.
userId: 8346 missing values.         0 NaN's; 0 Null's; 8346 empty strings.


#### Missing values in `userId` and `sessionId`

In [6]:
# Remove rows that have NaN in either userId or sessionId
num_rows = df.count()
print(f"There are {df.count()} rows in the dataset.")
df = df.dropna(how='any', subset=['userId', 'sessionId'])
print(f"There are {num_rows - df.count()} NaN's in either userId or sessionId. These rows are dropped.")

There are 286500 rows in the dataset.
There are 0 NaN's in either userId or sessionId. These rows are dropped.


In [7]:
# Preview the values of useId
df.select(['userId']).dropDuplicates().orderBy(df['userId']).show(5)

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
+------+
only showing top 5 rows



An `userId` of empty string looks suspicious. To figure out what's going on, I'm going to look at the `page` events for empty `userId`'s in comparison with all `userId`'s.

In [8]:
# Create a view to run SQL queries
df.createOrReplaceTempView("df_table")

In [9]:
# page events of users whose userId IS an empty string
spark.sql(
    '''
    SELECT DISTINCT page
    FROM df_table
    WHERE userId == ""
    '''
).show()

+-------------------+
|               page|
+-------------------+
|               Home|
|              About|
|Submit Registration|
|              Login|
|           Register|
|               Help|
|              Error|
+-------------------+



In [10]:
# page events of users whose userId is NOT an empty string
spark.sql(
    '''
    SELECT DISTINCT page
    FROM df_table
    EXCEPT
    SELECT DISTINCT page
    FROM df_table
    WHERE userId == ""
    '''
).show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|             Upgrade|
|      Submit Upgrade|
+--------------------+



An `userId` of empty string likely represents user(s) who are in the middle of or prior to sign-in or registration. Therefore, the rows corresponding to empty `userId` are dropped.

In [11]:
# Drop rows that have empty userId
df = df.filter(df['userId'] != "")

Because `sessionId` column is numeric, we don't need to worry about empty string problem for this column.

In [12]:
# Check the number of missing values in userId and sessionId
print("Number of missing values:")
for coln in ['userId', 'sessionId', 'firstName', 'gender', 'lastName', 'location', 'registration', 'userAgent']:
    missing_count = df.filter((isnan(df[coln])) | (df[coln].isNull()) | (df[coln] == "")).count()
    print(f"    {coln}: {missing_count} missing values.")

Number of missing values:
    userId: 0 missing values.
    sessionId: 0 missing values.
    firstName: 0 missing values.
    gender: 0 missing values.
    lastName: 0 missing values.
    location: 0 missing values.
    registration: 0 missing values.
    userAgent: 0 missing values.


#### Missing values in `artist`, `length`, `song`

In [13]:
# page events of null artist values
df.filter(df['artist'].isNull()).select(df['page']).dropDuplicates().show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+



In [14]:
# page events of NOT null artist values
df.filter(df['artist'].isNotNull()).select(df['page']).dropDuplicates().show()

+--------+
|    page|
+--------+
|NextSong|
+--------+



It seems that only page event "NextSong", i.e. playing a song, has corresponding artist information, which makes sense. This should apply for each of the `artist`, `length`, and `song` columns

In [15]:
# Check the number of missing values of artist, length, song columns when page event is "NextSong"
df_song = df.filter(df['page'] == "NextSong")
print("When page event is 'NextSong':")
for coln in ['artist', 'length', 'song']:
    missing_count = df_song.filter((isnan(df[coln])) | (df[coln].isNull()) | (df[coln] == "")).count()
    print(f"    {coln}: {missing_count} missing values.")

When page event is 'NextSong':
    artist: 0 missing values.
    length: 0 missing values.
    song: 0 missing values.


## 2. Exploratory Data Analysis (EDA)

Perform EDA using a small subset of the full dataset.

### 2.1. Overview of numerical columns

A quick look at descriptive statistics.

### 2.2. Overview of non-numerical columns

A quick look at the possible categories.

### 2.3. Define churn

### 2.3. Explore data
Analyze the behavior for users who stayed vs users who churned.

## 3. Feature Engineering

In this section, I'm going to build out the features that I think useful to train my model on. First, I'm going to do pilot experiment on a small subset of the full dataset. Then, I will use the code on the full dataset.

## 4. Modeling

## 5. Final Steps