In [1]:
# The code was removed by Watson Studio for sharing.

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20190722144804-0000
KERNEL_ID = 290ffe24-c479-433e-9dbd-1fd0d0ed86fc


In [2]:
# import libraries
import re
import datetime
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, isnull
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [3]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [4]:
# Read in full sparkify dataset

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

df = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-ma7s7cezdcowaq'))
df.persist()
#df.take(5)


DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

### Load and Data clean

In [6]:
df.count()

543705

In [7]:
df.head(2)

[Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293'),
 Row(artist="John Brown's Body", auth='Logged In', firstName='Sawyer', gender='M', itemInSession=74, lastName='Larson', length=380.21179, level='free', location='Houston-The Woodlands-Sugar Land, TX', method='PUT', page='NextSong', registration=1538069638000, sessionId=97, song='Bulls', status=200, ts=1538352025000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='98')]

In [9]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



#### Schema information

- artist: Artist name (ex. 'Martin Orford')
- auth: User authentication status (ex. 'Logged In')
- firstName: User first name (ex. 'Joseph')
- gender: Gender (ex. F or M)
- itemInSession: Item count in a session (ex. 20)
- lastName: User last name (ex. 'Morales')
- length: Length of song (ex. 597.55057)
- level: User plan (ex. 'free')
- location: User's location (ex. 'Corpus Christi, TX')
- method: HTTP method (ex. PUT)
- page: Page name (ex. 'NextSong')
- registration: Registration timestamp (unix timestamp) (ex. 1532063507000)
- sessionId: Session ID (ex. 292)
- song: Song (ex. 'Grand Designs')
- status: HTTP status (ex. 200)
- ts: Event timestamp(unix timestamp) (ex. 11538352011000)
- userAgent: User's browswer agent (ex. '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"')
- userId: User ID (ex. 293)

In [11]:
df.describe('Length').show()

+-------+------------------+
|summary|            Length|
+-------+------------------+
|  count|            432877|
|   mean|248.66459278007738|
| stddev| 98.41266955051972|
|    min|           0.78322|
|    max|        3024.66567|
+-------+------------------+



In [12]:
# show page event
df.select("page").dropDuplicates().sort("page").show()

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
| Submit Registration|
|      Submit Upgrade|
|         Thumbs Down|
+--------------------+
only showing top 20 rows



#### Check missing data

In [30]:
#def count_missing(df, col):
#    """
#    A helper function which count how many missing values in a colum of the dataset.
    
#    This function is useful because the data can be either three cases below:
    
#    1. NaN
#    2. Null
#    3. "" (empty string)
#    """
#    return df.filter((isnan(df[col])) | (df[col].isNull()) | (df[col] == "")).count()

print("[percentage of missing data]\n")
for col in df.columns:
    #print(col)
    #missing_count = df.filter((df[col].isNa()) | (df[col].isNull()) | (df[col] == "")).count() #count_missing(df, col)
    missing_count = df.filter(df[col].isNull()).count()*100 #count_missing(df, col)
    
    if missing_count > 0:
        print("{}: {}".format(col, missing_count/df.count()))

[percentage of missing data]

artist: 20.383847858673363
firstName: 2.8875952952428245
gender: 2.8875952952428245
lastName: 2.8875952952428245
length: 20.383847858673363
location: 2.8875952952428245
registration: 2.8875952952428245
song: 20.383847858673363
userAgent: 2.8875952952428245


In [53]:
df_clean = df.dropna(how = "any", subset = ["userId", "sessionId"])
df_clean = df_clean.filter(df["userId"] != "") # `userId` should not be empty string

In [43]:
print("df:{}".format(df.count()))
print("df_clean: {}".format(df_clean.count())) # no missing values
print("{} rows are removed".format(df.count()-df_clean.count()))


df:543705
df_clean: 528005
15700 rows are removed


In [44]:
num_cols = []
cat_cols = []

for s in df.schema:
    data_type = str(s.dataType)
    if data_type == "StringType":
        cat_cols.append(s.name)
    
    if data_type == "LongType" or data_type == "DoubleType":
        num_cols.append(s.name)

In [34]:
num_cols

['itemInSession', 'length', 'registration', 'sessionId', 'status', 'ts']

In [35]:
cat_cols

['artist',
 'auth',
 'firstName',
 'gender',
 'lastName',
 'level',
 'location',
 'method',
 'page',
 'song',
 'userAgent',
 'userId']

In [45]:
df_clean.describe(num_cols).show()

+-------+------------------+------------------+--------------------+------------------+------------------+--------------------+
|summary|     itemInSession|            length|        registration|         sessionId|            status|                  ts|
+-------+------------------+------------------+--------------------+------------------+------------------+--------------------+
|  count|            528005|            432877|              528005|            528005|            528005|              528005|
|   mean|107.77899451709737|248.66459278007738|1.535523414862437E12|2042.9801820058522|209.09106163767387|1.540966927748435...|
| stddev| 116.8647866296988| 98.41266955051972|3.0787254929957166E9|1433.9981489410682|30.148777830591797|1.4812330940351417E9|
|    min|                 0|           0.78322|       1509854193000|                 1|               200|       1538352011000|
|    max|              1005|        3024.66567|       1543073874000|              4808|               40

In [46]:
df_clean.select("auth").dropDuplicates().show()

+---------+
|     auth|
+---------+
|Cancelled|
|Logged In|
+---------+



In [47]:
df_clean.select("level").dropDuplicates().show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [48]:
df_clean.select("method").dropDuplicates().show()

+------+
|method|
+------+
|   PUT|
|   GET|
+------+



In [49]:
df_clean.select("page").dropDuplicates().show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+



#### Define Churn
The user whose page has "Cancellation Confirmation" is defined as a churned user in this analysis.

In [56]:
df_clean.filter("page = 'Cancellation Confirmation'").show(5)
print(df_clean.filter("page = 'Cancellation Confirmation'").count())

+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+--------------------+-------------+---------+----+------+-------------+--------------------+------+
|artist|     auth|firstName|gender|itemInSession|lastName|length|level|            location|method|                page| registration|sessionId|song|status|           ts|           userAgent|userId|
+------+---------+---------+------+-------------+--------+------+-----+--------------------+------+--------------------+-------------+---------+----+------+-------------+--------------------+------+
|  null|Cancelled|   Olivia|     F|           40|    Carr|  null| free|      Fort Wayne, IN|   GET|Cancellation Conf...|1536758439000|      490|null|   200|1538400616000|Mozilla/5.0 (Wind...|   208|
|  null|Cancelled|  Lillian|     F|          234| Cameron|  null| paid|        Columbus, OH|   GET|Cancellation Conf...|1533472700000|      471|null|   200|1538482793000|Mozilla/5.0 (Wind...|   231|
|  nu

#### calculate chun rate

In [62]:
churned_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
df_churned = df_clean.withColumn("churned", churned_event("page")) 

churned_rate = df_churned.groupby("userId").agg({"churned": "sum"}).select(avg("sum(churned)")).collect()[0]["avg(sum(churned))"]
print("churned rate: {:.2f}%".format(churned_rate * 100))

churned rate: 22.10%


In [64]:
df_churned.select(["userId", "gender", "level", "page", "status", "ts", "churned"]).show(10)

+------+------+-----+--------+------+-------------+-------+
|userId|gender|level|    page|status|           ts|churned|
+------+------+-----+--------+------+-------------+-------+
|   293|     M| free|NextSong|   200|1538352011000|      0|
|    98|     M| free|NextSong|   200|1538352025000|      0|
|   179|     M| paid|NextSong|   200|1538352118000|      0|
|   179|     M| paid|  Logout|   307|1538352119000|      0|
|   246|     F| paid|NextSong|   200|1538352124000|      0|
|   163|     F| paid|NextSong|   200|1538352125000|      0|
|   179|     M| paid|    Home|   200|1538352176000|      0|
|   175|     F| free|NextSong|   200|1538352215000|      0|
|   100|     M| free|    Home|   200|1538352241000|      0|
|   100|     M| free|NextSong|   200|1538352259000|      0|
+------+------+-----+--------+------+-------------+-------+
only showing top 10 rows

