In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, avg
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import datetime

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1560948766617_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()


VBox()

In [3]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')

In [4]:
df.printSchema()

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [5]:
df.describe('userId').show()

VBox()

+-------+------------------+
|summary|            userId|
+-------+------------------+
|  count|          26259199|
|   mean|1488379.8347142653|
| stddev| 286970.0889462398|
|    min|           1000025|
|    max|           1999996|
+-------+------------------+

In [6]:
#drop any null values
user_log_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])

VBox()

In [7]:
#keep only userID with value
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

VBox()

In [8]:
user_log_valid.count()

VBox()

26259199

In [9]:
#function to encode in integer value the string "Cancellation Confirmation"
downgrade_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

VBox()

In [10]:
user_log_cancel = user_log_valid.withColumn("Churn", downgrade_event("page"))

VBox()

In [11]:
#Get a column with the hour
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
user_log_cancel = user_log_cancel.withColumn("hour", get_hour(user_log_cancel.ts))

VBox()

In [12]:
user_log_cancel.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042', Churn=0, hour=u'0')

In [13]:
user_log_cancel.groupby("Churn").count().show()

VBox()

+-----+--------+
|Churn|   count|
+-----+--------+
|    1|    5003|
|    0|26254196|
+-----+--------+

In [20]:
#Drop any null values in our features of interest
user_log_cancel=user_log_cancel.dropna(how = "any", subset = ["userAgent", "gender", "page","level","Churn"])

VBox()

## Feature Engineering

In [21]:
#build pipeline
Gender_indexer = StringIndexer(inputCol="gender", outputCol='Gender_index')
User_indexer = StringIndexer(inputCol="userAgent", outputCol='User_index')
Page_indexer = StringIndexer(inputCol="page", outputCol='Page_index')
Level_indexer= StringIndexer(inputCol="level", outputCol='Level_index')
indexer = StringIndexer(inputCol="Churn", outputCol="label")

assembler = VectorAssembler(inputCols=["Gender_index", "User_index", "Page_index", "status","Level_index"], outputCol="features")



VBox()

## Modeling

In [22]:
#Split training and testing sets
rest, validation = user_log_cancel.randomSplit([0.8,0.2], seed=42)

VBox()

In [23]:
#logistic model as it is a binary classification, maximum iteraction of 10
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

pipeline = Pipeline(stages=[Gender_indexer, User_indexer, Page_indexer,Level_indexer, indexer, assembler, lr])

VBox()

In [24]:
#Tuning our model:
#On the first 80% of the data let's find the most accurate logistic regression model using 3-fold cross-validation with the following parameter grid:
#LogisticRegression regularization parameter: [0.0, 0.1]
    
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)

VBox()

In [25]:
cvModel_q1 = crossval.fit(rest)

VBox()

In [26]:
cvModel_q1.avgMetrics

VBox()

[1.0, 0.9997076800962561]

In [27]:
results = cvModel_q1.transform(validation)

VBox()

In [28]:
print(results.filter(results.label == results.prediction).count())
print(results.count())

VBox()

5094455
5094455

Without tuning much the parameters, we reached perfect score which is a bit surprising.