# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import libraries
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, desc, asc
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [3]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify Capstone Project for Udacity") \
    .getOrCreate()

In [4]:
# view Spark session configuration
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1639875925745'),
 ('spark.app.name', 'Sparkify Capstone Project for Udacity'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '5525b8ada7c4'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '37437')]

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [5]:
# load in dataset
user_log = spark.read.json("mini_sparkify_event_data.json")

In [6]:
# view the schema
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
# view some row values
user_log.take(2)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9')]

In [8]:
# view list of all possible pages
user_log.select("page").dropDuplicates().sort("page").collect()

[Row(page='About'),
 Row(page='Add Friend'),
 Row(page='Add to Playlist'),
 Row(page='Cancel'),
 Row(page='Cancellation Confirmation'),
 Row(page='Downgrade'),
 Row(page='Error'),
 Row(page='Help'),
 Row(page='Home'),
 Row(page='Login'),
 Row(page='Logout'),
 Row(page='NextSong'),
 Row(page='Register'),
 Row(page='Roll Advert'),
 Row(page='Save Settings'),
 Row(page='Settings'),
 Row(page='Submit Downgrade'),
 Row(page='Submit Registration'),
 Row(page='Submit Upgrade'),
 Row(page='Thumbs Down'),
 Row(page='Thumbs Up'),
 Row(page='Upgrade')]

In [9]:
# view example of a user's chronological actions
user_log.select(["ts", "page", "song"]).where(user_log.userId == "30").orderBy(asc("ts")).collect()

[Row(ts=1538352117000, page='NextSong', song='Rockpools'),
 Row(ts=1538352394000, page='NextSong', song='Time For Miracles'),
 Row(ts=1538352676000, page='NextSong', song='Harder Better Faster Stronger'),
 Row(ts=1538352899000, page='NextSong', song='Passengers (Old Album Version)'),
 Row(ts=1538352905000, page='Add to Playlist', song=None),
 Row(ts=1538353084000, page='NextSong', song='Fuck Kitty'),
 Row(ts=1538353218000, page='NextSong', song='Jade'),
 Row(ts=1538353441000, page='NextSong', song='So-Called Friends'),
 Row(ts=1538353687000, page='NextSong', song='Represent'),
 Row(ts=1538353909000, page='NextSong', song='Here I Am'),
 Row(ts=1538354132000, page='NextSong', song='Rebirthing (Album Version)'),
 Row(ts=1538354365000, page='NextSong', song='Dog Days Are Over (Radio Edit)'),
 Row(ts=1538354584000, page='NextSong', song='Tomorrow Is A Long Time'),
 Row(ts=1538354806000, page='NextSong', song='Halloween Spooks'),
 Row(ts=1538354945000, page='NextSong', song='Stronger'),
 Row

In [10]:
# filter out missing user ids
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")
user_log_valid.count()

278154

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [11]:
# flag events of interest beginning with churn
# I chose to name my column "churn_flag" instead of "Churn"

flag_churn_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
flag_up_event = udf(lambda x: 1 if x == "Thumbs Up" else 0, IntegerType())
flag_down_event = udf(lambda x: 1 if x == "Thumbs Down" else 0, IntegerType())
flag_listen_event = udf(lambda x: 1 if x == "NextSong" else 0, IntegerType())
flag_playlist_event = udf(lambda x: 1 if x == "Add to Playlist" else 0, IntegerType())

user_log_valid = user_log_valid.withColumn("churn_flag", flag_churn_event("page"))
user_log_valid = user_log_valid.withColumn("up_flag", flag_up_event("page"))
user_log_valid = user_log_valid.withColumn("down_flag", flag_down_event("page"))
user_log_valid = user_log_valid.withColumn("listen_flag", flag_listen_event("page"))
user_log_valid = user_log_valid.withColumn("playlist_flag", flag_playlist_event("page"))


In [12]:
# verify new columns exist as expected
user_log_valid.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30', churn_flag=0, up_flag=0, down_flag=0, listen_flag=1, playlist_flag=0)

In [13]:
# -----add a column to identify whether each user in the dataset ever churns at any point-----
# create a dataset of just users who churned and then join the full dataset to this churn dataset
# resulting in a new column, "ever_churns", that can be used for aggregation

churned_users = user_log_valid.select(["userId","churn_flag"]).where(user_log_valid.churn_flag == 1)
churned_users = churned_users.withColumnRenamed("userId", "userId_churned").withColumnRenamed("churn_flag", "ever_churns")
user_log_joined2 = user_log_valid.join(churned_users,user_log_valid.userId ==  churned_users.userId_churned,"left")

In [14]:
# compare how many users churn vs do not churn
user_log_joined2.groupby("ever_churns").count().show()

+-----------+------+
|ever_churns| count|
+-----------+------+
|       null|233290|
|          1| 44864|
+-----------+------+



In [15]:
# compare the playlist actions of each group
user_log_joined2.groupby("ever_churns").mean("playlist_flag").show()

+-----------+--------------------+
|ever_churns|  avg(playlist_flag)|
+-----------+--------------------+
|       null|0.023524368811350678|
|          1|0.023136590584878745|
+-----------+--------------------+



In [16]:
# compare the listening actions of each group
user_log_joined2.groupby("ever_churns").mean("listen_flag").show()

+-----------+------------------+
|ever_churns|  avg(listen_flag)|
+-----------+------------------+
|       null|0.8217840456084702|
|          1|0.8112072039942939|
+-----------+------------------+



In [17]:
# compare the thumbs-up actions of each group
user_log_joined2.groupby("ever_churns").mean("up_flag").show()

+-----------+-------------------+
|ever_churns|       avg(up_flag)|
+-----------+-------------------+
|       null|0.04583136868275537|
|          1|0.04143634094151213|
+-----------+-------------------+



In [18]:
# compare the thumbs-down actions of each group
user_log_joined2.groupby("ever_churns").mean("down_flag").show()

+-----------+--------------------+
|ever_churns|      avg(down_flag)|
+-----------+--------------------+
|       null|0.008787346221441126|
|          1|0.011055634807417974|
+-----------+--------------------+



In [19]:
# thumbs-down actions appear to be notably different between the groups.

# create a dataframe of aggregate data about each user.
# the new dataframe should have the total count of each event and the churn status of each user

In [20]:
# total count of each event per user
metaDf1 = user_log_valid.groupby("userid").sum("up_flag", "down_flag", "listen_flag", "playlist_flag")

In [21]:
# churn status of each user
metaDf2 = user_log_valid.groupby("userid").max("churn_flag").withColumnRenamed("userId", "userId_dup")

In [22]:
# combine the two dataframes of metadata
metaDf = metaDf1.join(metaDf2, metaDf1.userid == metaDf2.userId_dup).drop("userId_dup")

In [23]:
# view the results
metaDf.show()

+------+------------+--------------+----------------+------------------+---------------+
|userid|sum(up_flag)|sum(down_flag)|sum(listen_flag)|sum(playlist_flag)|max(churn_flag)|
+------+------------+--------------+----------------+------------------+---------------+
|100010|          17|             5|             275|                 7|              0|
|200002|          21|             6|             387|                 8|              0|
|   125|           0|             0|               8|                 0|              1|
|   124|         171|            41|            4079|               118|              0|
|    51|         100|            21|            2111|                52|              1|
|     7|           7|             1|             150|                 5|              0|
|    15|          81|            14|            1914|                59|              0|
|    54|         163|            29|            2841|                72|              1|
|   155|          58|

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

### Early Attempts that Failed
I tried various approaches before deciding on my final feature set. I show them at the end of the Notebook for completeness and to solicit feedback on the ideas.

### Feature Choice

In [24]:
# vectorize the data features
assembler = VectorAssembler(inputCols=["sum(up_flag)", "sum(down_flag)", "sum(listen_flag)", "sum(playlist_flag)"], outputCol="FeaturesVector")
metaDf = assembler.transform(metaDf)

# normalize the now vectorized features using Normalizer
# Normalizer operates row-wise, scaling each row so that its norm equals 1
scaler = Normalizer(inputCol="FeaturesVector", outputCol="ScaledNumFeatures")
metaDf = scaler.transform(metaDf)

# also try using StandardScaler on the feature vector as a second option to test
# StandardScaler operates column-wise to remove the mean and scale to unit variance
scaler2 = StandardScaler(inputCol="FeaturesVector", outputCol="ScaledNumFeatures2", withStd=True)
scalerModel = scaler2.fit(metaDf)
metaDf = scalerModel.transform(metaDf)

In [25]:
# view the results
metaDf.head()

Row(userid='100010', sum(up_flag)=17, sum(down_flag)=5, sum(listen_flag)=275, sum(playlist_flag)=7, max(churn_flag)=0, FeaturesVector=DenseVector([17.0, 5.0, 275.0, 7.0]), ScaledNumFeatures=DenseVector([0.0617, 0.0181, 0.9976, 0.0254]), ScaledNumFeatures2=DenseVector([0.2596, 0.3823, 0.2489, 0.214]))

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [26]:
# split the data set into train and test sets (7:3)
# CrossValidator negates the need for a validation set as this function is built-in

train, test = metaDf.randomSplit([0.7, 0.3], seed=42)

In [27]:
# build, train, and test a Logistic Regression model

# in order to prevent overfitting, I choose a regularization paramter of zero
# I aslo set elasticNetParam to zero so penalties will not be combined when I introduce regParam = 0.1 in my Pipeline
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

# build pipeline
lrPipeline = Pipeline(stages=[lr])

# add regParam value of 0.1 to be tested
lrParamGrid = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1]) \
    .build()

# build CrossValidator with 3 folds
lrCrossval = CrossValidator(estimator=lrPipeline,
                          estimatorParamMaps=lrParamGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

# train model on Normalizer feature vector
lrModel1 = lrCrossval.fit(train.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures").alias("features")))
# train model on StandardScaler feature vector
lrModel2 = lrCrossval.fit(train.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures2").alias("features")))

# test both models
lrResults1 = lrModel1.transform(test.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures").alias("features")))
lrResults2 = lrModel2.transform(test.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures2").alias("features")))

In [28]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(lrModel1.avgMetrics)
print(lrResults1.filter(lrResults1.label == lrResults1.prediction).count())
print(lrResults1.count())

[0.6726159469635807, 0.6726159469635807]
46
62


In [29]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(lrModel2.avgMetrics)
print(lrResults2.filter(lrResults2.label == lrResults2.prediction).count())
print(lrResults2.count())

[0.6640595419454332, 0.6640547447048994]
46
62


In [30]:
# build, train, and test a Random Forest model

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')

# build pipeline
rfPipeline = Pipeline(stages=[rf])

# set values of both 3 and 10 to be tested for the number of trees to build before averaging the trees' predictions
# I choose two low values in order to spare processing burden. If 10 performs better than 3, I could try higher values.
rfParamGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 100]) \
    .build()

# build CrossValidator with 3 folds
rfCrossval = CrossValidator(
    estimator=rfPipeline,
    estimatorParamMaps=rfParamGrid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=3)

# train model on Normalizer feature vector
rfModel1 = rfCrossval.fit(train.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures").alias("features")))
# train model on StandardScaler feature vector
rfModel2 = rfCrossval.fit(train.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures2").alias("features")))

# test both models
rfResults1 = rfModel1.transform(test.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures").alias("features")))
rfResults2 = rfModel2.transform(test.select(col("max(churn_flag)").alias("label"), col("ScaledNumFeatures2").alias("features")))

In [31]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(rfModel1.avgMetrics)
print(rfResults1.filter(rfResults1.label == rfResults1.prediction).count())
print(rfResults1.count())

[0.6010086490736646, 0.622668073077184]
45
62


In [32]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(rfModel2.avgMetrics)
print(rfResults2.filter(rfResults2.label == rfResults2.prediction).count())
print(rfResults2.count())

[0.6734975569175038, 0.6574074951618074]
45
62


# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.

See <a href="https://carrie-kruppa.medium.com/if-i-can-do-ml-in-spark-so-can-you-e32a7b56275c">blog post</a> on Medium for further discussion of this project.

### Early Attempts that Failed
I tried various approaches before deciding on my final feature set. I show them here for completeness and to solicit feedback on the ideas.

In [33]:
# At first I tried to turn the various pages a user could visit into dummy
# columns using code I didn't really understand. 
# The code worked, but I opted not to use it because I didn't grasp it.

"""
from pyspark.sql.functions import when
from pyspark.sql.functions import col

pages = user_log_valid.select("page").distinct().rdd.flatMap(lambda x: x).collect()
pages_expr = [when(col("page") == pg, 1).otherwise(0).alias("PAGE_" + pg) for pg in pages]
user_log_dummies = user_log_valid.select("page", *pages_expr)
user_log_dummies.show()
"""

'\nfrom pyspark.sql.functions import when\nfrom pyspark.sql.functions import col\n\npages = user_log_valid.select("page").distinct().rdd.flatMap(lambda x: x).collect()\npages_expr = [when(col("page") == pg, 1).otherwise(0).alias("PAGE_" + pg) for pg in pages]\nuser_log_dummies = user_log_valid.select("page", *pages_expr)\nuser_log_dummies.show()\n'

In [34]:
# Next, I wanted to create a running total of how many of each event a user has had up to the current date
# I wanted to use this for prediction modeling, i.e., identifying when a user's positive activity is slowing down
# I successfully created the features, but I couldn't figure out how to build a model based on the features

from pyspark.sql import Window
from pyspark.sql.functions import sum as Fsum

windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
user_log_valid = user_log_valid.withColumn("songs_to_date", Fsum("listen_flag").over(windowval))
user_log_valid = user_log_valid.withColumn("thumbsup_to_date", Fsum("up_flag").over(windowval))
user_log_valid = user_log_valid.withColumn("thumbsdown_to_date", Fsum("down_flag").over(windowval))
user_log_valid = user_log_valid.withColumn("pladds_to_date", Fsum("playlist_flag").over(windowval))
# user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(user_log.userId == "51").sort("ts").collect()



In [35]:
# vectorizing the features I just created
assembler = VectorAssembler(inputCols=["songs_to_date", "thumbsup_to_date", "thumbsdown_to_date", "pladds_to_date"], outputCol="FeaturesVector")
user_log_valid = assembler.transform(user_log_valid)

# normalize feature vector row-wise
scaler = Normalizer(inputCol="FeaturesVector", outputCol="ScaledNumFeatures")
user_log_valid = scaler.transform(user_log_valid)

# normalize the features column-wise as a second option to test
scaler2 = StandardScaler(inputCol="FeaturesVector", outputCol="ScaledNumFeatures2", withStd=True)
scalerModel = scaler2.fit(user_log_valid)
user_log_valid = scalerModel.transform(user_log_valid)


In [36]:
# Still thinking along the lines of predicting a downward trend of events for each user based on running totals...
# I set out to create the label to go with my feature set.
# I decided to add a column to the data that indicates if a user is going to churn at any point in the next 7 days.
# My thinking is I could train a model to trend a user's events & predict if they would churn in the next week.
# I successfully created the label, but I couldn't figure out how to do the modeling. Too complex for me!

churn_users_dates = user_log_valid.select(["userId", "ts"]).where(user_log.page == "Cancellation Confirmation")
churn_users_dates = churn_users_dates.withColumnRenamed("ts", "churn_ts").withColumnRenamed("userId", "userId2")
user_log_joined = user_log_valid.join(churn_users_dates,user_log_valid.userId ==  churn_users_dates.userId2,"left")

def churnInNext3DaysF(x, y):
    if x is None:
        return 0
    if x - y < 0:
        return 0
    if x - y < 259200000: # 604800000
        return 1
    return 0

churnInNext3Days = udf(churnInNext3DaysF, IntegerType())

user_log_joined = user_log_joined.withColumn('churnInNext3Days', churnInNext3Days('churn_ts', 'ts'))
user_log_joined.show(5)


+----------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+-----------+-------------+---------+-------------------+------+-------------+--------------------+------+----------+-------+---------+-----------+-------------+-------------+----------------+------------------+--------------+--------------+-----------------+--------------------+-------+--------+----------------+
|    artist|     auth|firstName|gender|itemInSession| lastName|   length|level|            location|method|       page| registration|sessionId|               song|status|           ts|           userAgent|userId|churn_flag|up_flag|down_flag|listen_flag|playlist_flag|songs_to_date|thumbsup_to_date|thumbsdown_to_date|pladds_to_date|FeaturesVector|ScaledNumFeatures|  ScaledNumFeatures2|userId2|churn_ts|churnInNext3Days|
+----------+---------+---------+------+-------------+---------+---------+-----+--------------------+------+-----------+-------------+---------+---------------

In [37]:
# view entire transaction history of a user I know churned
# verify the field churnInNext3Days has a value of 0 at first and 1 at the end

user_log_joined.select(["page","ts","churnInNext3Days"]).where(user_log_joined.userId == "18").collect()

[Row(page='Home', ts=1538499917000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538499933000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538500208000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538500476000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538500654000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538500842000, churnInNext3Days=0),
 Row(page='Settings', ts=1538500856000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538501009000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538501340000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538501587000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538501740000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538501984000, churnInNext3Days=0),
 Row(page='Settings', ts=1538502021000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538502124000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538502356000, churnInNext3Days=0),
 Row(page='NextSong', ts=1538502626000, churnInNext3Days=0),
 Row(page='NextSong', ts=153

In [38]:
train_Full, test_Full = user_log_joined.randomSplit([0.7, 0.3], seed=42)

# build, train, and test a Logistic Regression model

# in order to prevent overfitting, I choose a regularization paramter of zero
# I aslo set elasticNetParam to zero so penalties will not be combined when I introduce regParam = 0.1 in my Pipeline
lr_Full =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

# build pipeline
lrPipeline_Full = Pipeline(stages=[lr_Full])

# add regParam value of 0.1 to be tested
lrParamGrid_Full = ParamGridBuilder() \
    .addGrid(lr_Full.regParam,[0.0, 0.1]) \
    .build()

# build CrossValidator with 3 folds
lrCrossval_Full = CrossValidator(estimator=lrPipeline_Full,
                          estimatorParamMaps=lrParamGrid_Full,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

# train model on Normalizer feature vector
lrModel1_Full = lrCrossval_Full.fit(train_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures").alias("features")))
# train model on StandardScaler feature vector
lrModel2_Full = lrCrossval_Full.fit(train_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures2").alias("features")))

# test both models
lrResults1_Full = lrModel1_Full.transform(test_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures").alias("features")))
lrResults2_Full = lrModel2_Full.transform(test_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures2").alias("features")))

In [39]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(lrModel1_Full.avgMetrics)
print(lrResults1_Full.filter(lrResults1_Full.label == lrResults1_Full.prediction).count())
print(lrResults1_Full.count())

[0.5755427961999795, 0.5776755285920987]
80846
83278


In [40]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(lrModel2_Full.avgMetrics)
print(lrResults2_Full.filter(lrResults2_Full.label == lrResults2_Full.prediction).count())
print(lrResults2_Full.count())

[0.9016220716172856, 0.894966678736054]
80841
83278


In [41]:
# build, train, and test a Random Forest model

rf_Full = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')

# build pipeline
rfPipeline_Full = Pipeline(stages=[rf_Full])

# set values of both 3 and 10 to be tested for the number of trees to build before averaging the trees' predictions
# I choose two low values in order to spare processing burden. If 10 performs better than 3, I could try higher values.
rfParamGrid_Full = ParamGridBuilder() \
    .addGrid(rf_Full.numTrees, [10, 100]) \
    .build()

# build CrossValidator with 3 folds
rfCrossval_Full = CrossValidator(
    estimator=rfPipeline_Full,
    estimatorParamMaps=rfParamGrid_Full,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=3)

# train model on Normalizer feature vector
rfModel1_Full = rfCrossval_Full.fit(train_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures").alias("features")))
# train model on StandardScaler feature vector
rfModel2_Full = rfCrossval_Full.fit(train_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures2").alias("features")))

# test both models
rfResults1_Full = rfModel1_Full.transform(test_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures").alias("features")))
rfResults2_Full = rfModel2_Full.transform(test_Full.select(col("churnInNext3Days").alias("label"), col("ScaledNumFeatures2").alias("features")))

In [42]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(rfModel1_Full.avgMetrics)
print(rfResults1_Full.filter(rfResults1_Full.label == rfResults1_Full.prediction).count())
print(rfResults1_Full.count())

[0.8538842208767887, 0.8637761254177923]
80892
83278


In [43]:
# view average metrics along with the count of correct predictions and total predictions
# see blog post for discussion
print(rfModel2_Full.avgMetrics)
print(rfResults2_Full.filter(rfResults2_Full.label == rfResults2_Full.prediction).count())
print(rfResults2_Full.count())

[0.9134377429780883, 0.9151635211010412]
80861
83278
