# Data Wrangling with Spark
This workspace contains a tiny subset (128MB) of the full dataset available (12GB) to explore a smaller subset with Spark before deploying your cluster on the cloud. The dataset describes log events comming from a music streaming service. The records describe events such as logging in to the site, visiting a page, listening to the next song, seeing an ad.

In [1]:
!pip install --upgrade matplotlib

Requirement already up-to-date: matplotlib in /opt/conda/lib/python3.6/site-packages (3.3.4)


In [2]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, desc, col, hour, dayofmonth, weekofyear, month, year, dayofweek, from_unixtime
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType
from pyspark.sql import Window
from pyspark.sql.types import*

import datetime

import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
# create a Spark session
spark = SparkSession.builder.appName("Spark SQL").getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [4]:
# the IP address of the master node
path = "mini_sparkify_event_data.json"

In [5]:
# loading the json file into a spark DataFrame
user_log = spark.read.json(path)

# Data Exploration 

In [6]:
# Seeing the columns we have in this dataframe
user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



There are fields describing the user like firstName, lastName and userId. We also have information about the request, for example, the page the user accessed, the HTTP method, and the status of the request.

In [7]:
# looking at one particular record
user_log.show(n=1)

+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
|        artist|     auth|firstName|gender|itemInSession|lastName|   length|level|       location|method|    page| registration|sessionId|     song|status|           ts|           userAgent|userId|
+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
|Martha Tilston|Logged In|    Colin|     M|           50| Freeman|277.89016| paid|Bakersfield, CA|   PUT|NextSong|1538173362000|       29|Rockpools|   200|1538352117000|Mozilla/5.0 (Wind...|    30|
+--------------+---------+---------+------+-------------+--------+---------+-----+---------------+------+--------+-------------+---------+---------+------+-------------+--------------------+------+
only showi

Here we can see that Colin was listening to "Rockpools"

In [8]:
# Looking at the first 5 records
user_log.take(5)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9'),
 Row(artist='Adam Lambert', auth='Logged In', firstName='Colin', gender='M', itemInSession=51, lastName='Freeman', length=282.8273, level='paid', location='

In [9]:
# Summary statistics on the length column
user_log.describe("length").show()

+-------+-----------------+
|summary|           length|
+-------+-----------------+
|  count|           228108|
|   mean|249.1171819778458|
| stddev|99.23517921058361|
|    min|          0.78322|
|    max|       3024.66567|
+-------+-----------------+



In [10]:
print("We have {} rows on the dataset".format(user_log.count()))

We have 286500 rows on the dataset


In [11]:
# Looking at the page requests that we have
user_log.select("page").dropDuplicates().sort("page").show(25)

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
| Submit Registration|
|      Submit Upgrade|
|         Thumbs Down|
|           Thumbs Up|
|             Upgrade|
+--------------------+



Users can login and out, visit the homepage, or play a song.

In [12]:
user_log.select("level").dropDuplicates().sort("level").show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [13]:
user_log.select("auth").dropDuplicates().sort("auth").show() 

+----------+
|      auth|
+----------+
| Cancelled|
|     Guest|
| Logged In|
|Logged Out|
+----------+



In [14]:
# Looking at the events of userId 30
user_log.select(["userId", "firstName", "page", "song"]).filter(user_log.userId == "30").collect()

[Row(userId='30', firstName='Colin', page='NextSong', song='Rockpools'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Time For Miracles'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Harder Better Faster Stronger'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Passengers (Old Album Version)'),
 Row(userId='30', firstName='Colin', page='Add to Playlist', song=None),
 Row(userId='30', firstName='Colin', page='NextSong', song='Fuck Kitty'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Jade'),
 Row(userId='30', firstName='Colin', page='NextSong', song='So-Called Friends'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Represent'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Here I Am'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Rebirthing (Album Version)'),
 Row(userId='30', firstName='Colin', page='NextSong', song='Dog Days Are Over (Radio Edit)'),
 Row(userId='30', firstName='Coli

As users use different pages of the application, their page request are logged

# Calculating Statistics by Hour
How many songs do users listen to in a given hour?

In [15]:
# songs_in_hour = user_log_valid.filter((user_log_valid.page == "NextSong") & (user_log_valid.churn_user == 0)).groupby(user_log_valid.hour).count().orderBy(user_log_valid.hour.cast("float"))
# songs_in_hour_pd = songs_in_hour.toPandas()

In [16]:
# Converting timestamps to datetime from epoch time, to get the hour of the day
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)

In [17]:
# Adding a new column called hour to our dataframe
user_log = user_log.withColumn("hour", get_hour(user_log.ts))

In [18]:
user_log.select("hour").dropDuplicates().sort("hour").show() 

+----+
|hour|
+----+
|   0|
|   1|
|  10|
|  11|
|  12|
|  13|
|  14|
|  15|
|  16|
|  17|
|  18|
|  19|
|   2|
|  20|
|  21|
|  22|
|  23|
|   3|
|   4|
|   5|
+----+
only showing top 20 rows



In [19]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0))

In [20]:
user_log = user_log.withColumn('start_time', get_hour(user_log.ts).cast(TimestampType()))

In [21]:
user_log.select("start_time").dropDuplicates().sort("start_time").show() 

+----------+
|start_time|
+----------+
|      null|
+----------+



In [22]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)

In [23]:
user_log = user_log.withColumn('start_time', get_hour('ts').cast(TimestampType()))

In [24]:
user_log.select("start_time").dropDuplicates().sort("start_time").show() 

+----------+
|start_time|
+----------+
|      null|
+----------+



In [25]:
user_log = user_log.withColumn('date_again', from_unixtime('ts').cast(DateType()))

In [26]:
user_log.select("start_time").dropDuplicates().sort("start_time").show() 

+----------+
|start_time|
+----------+
|      null|
+----------+



Colin was listening to Martha Tilston at 12am

In [None]:
# # Counting the next song page requests
# songs_in_hour = user_log.filter(user_log.page == "NextSong").groupby(user_log.hour).count().orderBy(user_log.hour.cast("float"))

In [None]:
# songs_in_hour.show()

In [None]:
# songs_in_hour_pd = songs_in_hour.toPandas()
# plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
# plt.xlim(-1, 24)
# # plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
# plt.xlabel("Hour")
# plt.ylabel("Songs Played");

We can see some trends in how many songs our users played in different hours of the day

# Drop Rows with Missing Values
Dropping missing values, making sure that we have the userId and SessionId for all the records

In [None]:
# Dropping missing values
user_log_valid = user_log.dropna(how="any", subset=["userId", "sessionId"])

In [None]:
# Making sure that we have the userId and SessionId for all the records
user_log.select("userId").dropDuplicates().sort("userId").show()

There are not None values but there are users with no userId

In [None]:
# Looking at logs of users with no userId
user_log.select(["userId", "firstName", "page", "song"]).filter(user_log.userId == "").collect()

In [None]:
# filter for users with blank user id
blank_pages = user_log.filter(user_log.userId == '') \
    .select(col('page') \
    .alias('blank_pages')) \
    .dropDuplicates()

blank_pages.collect()

It seems that the user's with no id represent users who have not signed up yet or who are signed out and are about to log in.
It may also represents users who got an error while registrating or loging in.

In [None]:
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

In [None]:
user_log_valid.count()

We lost 8346 records. 

In [None]:
print("We lost {} records".format(user_log.count() - user_log_valid.count()))

In [None]:
female_users = user_log_valid.filter(user_log_valid.gender == 'F') \
    .select('userId', 'gender') \
    .dropDuplicates() \
    .count()

male_users = user_log_valid.filter(user_log_valid.gender == 'M') \
    .select('userId', 'gender') \
    .dropDuplicates() \
    .count()

In [None]:
print("We have {} female users and {} male users".format(female_users, male_users))

How many songs were played from the most played artist?

In [None]:
user_log_valid.filter(user_log_valid.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Artistcount') \
    .sort(desc('Artistcount')) \
    .show(1)

# Users Downgrade Their Accounts
We want to distinguish user's activity before and after they downgraded their account from a paid one to a free one.

In [None]:
# Users who downgraded their service
user_log_valid.filter("page = 'Submit Downgrade'").show()

In [None]:
# Listing the activity of user 61 who downgraded their service
user_log.select(["userId", "firstname", "page", "level", "song"]).filter(user_log.userId == "61").collect()

After user 61 submited the downgrade she became a free user.

We want to create a phase column which divides the events of a particular user based on these special events. User inventory should have a different value in the phase column before and after she downgraded her service.

In [None]:
# Flaging log entries when users downgrade their accounts
# The value here will be 1 if the event refers to a downgrade, and zero other wise
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))

In [None]:
user_log_valid.head()

The column downgraded appeared last

In [None]:
# Sorting records for a particular user in reverse time order, and adding up the values to distinguish each user's data 
# as either pre or post downgrade events
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
# Creating a phase column that uses the window function and cumulative sum
user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))

In [None]:
user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]).filter(user_log.userId == "30").sort("ts").collect()

Looking at the records, all the events before the downgrade event belong to phase one and after they submit downgrade, we switched to phase zero. If we have more than one phase we will see the phase values decreasing from n to zero.

In [None]:
user_log_valid.select("phase").dropDuplicates().sort("phase").show()

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

We want to create a Churn column which divides the users who have confirm a cancellation to use as the label of the model.

In [None]:
# Adding a new column called Churn to our dataframe
# The value here will be 1 if the user cancelled, and zero other wise

flag_downgrade_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
user_log_valid = user_log_valid.withColumn("Churn", flag_downgrade_event("page"))

In [None]:
# # create churn user column
user_log_valid = user_log_valid.withColumn('churn_user', Fmax(
    'Churn').over(Window.partitionBy('userId')))

In [None]:
user_log_valid.groupBy('churn_user').count().show()

In [None]:
user_log_valid.head()

Aggregates, how much of a specific action they experienced per a certain time unit
+--------------------+
|                page|
+--------------------+
|          Add Friend|
|     Add to Playlist|
|               Error|
|                Help|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|         Thumbs Down|
|           Thumbs Up|
+--------------------+

 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)


In [None]:
user_log = user_log.withColumn("hour", get_hour(user_log.ts))

In [None]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
df = user_log.withColumn('start_time', get_hour(user_log.ts).cast(TimestampType()))
# extract columns to create time table
time_table = df.select('start_time')
time_table = time_table.withColumn('hour', hour('start_time'))
time_table = time_table.withColumn('day', dayofmonth('start_time'))
time_table = time_table.withColumn('week', weekofyear('start_time'))
time_table = time_table.withColumn('month', month('start_time'))
time_table = time_table.withColumn('year', year('start_time'))
time_table = time_table.withColumn('weekday', dayofweek('start_time'))
time_table.head(5)

In [None]:
hour, dayofmonth, weekofyear, month, year, dayofweek

In [None]:
# Counting the next song page requests
songs_in_hour = user_log_valid.filter((user_log_valid.page == "NextSong") & (user_log_valid.churn_user == 0)).groupby(user_log_valid.hour).count().orderBy(user_log_valid.hour.cast("float"))
songs_in_hour_pd = songs_in_hour.toPandas()
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24)
# plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs Played");

In [None]:
# Counting the next song page requests
songs_in_hour = user_log_valid.filter((user_log_valid.page == "NextSong") & (user_log_valid.churn_user == 1)).groupby(user_log_valid.hour).count().orderBy(user_log_valid.hour.cast("float"))
songs_in_hour_pd = songs_in_hour.toPandas()
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24)
# plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs Played");

We can see some trends in how many songs our users played in different hours of the day

In [None]:
# # Counting the help page requests
# help_in_hour = user_log_valid.filter((user_log_valid.page == "Help") & (user_log_valid.churn_user == 0)).groupby(user_log_valid.hour).count().orderBy(user_log_valid.hour.cast("float"))
# help_in_hour_pd = help_in_hour.toPandas()
# plt.scatter(help_in_hour_pd["hour"], help_in_hour_pd["count"])
# plt.xlim(-1, 24)
# # plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
# plt.xlabel("Hour")
# plt.ylabel("Helps Requested");

In [None]:
# # Counting the help page requests
# help_in_hour_chrun = user_log_valid.filter((user_log_valid.page == "Help") & (user_log_valid.churn_user == 1)).groupby(user_log_valid.hour).count().orderBy(user_log_valid.hour.cast("float"))
# help_in_hour_churn_pd = help_in_hour_chrun.toPandas()
# plt.scatter(help_in_hour_churn_pd["hour"], help_in_hour_churn_pd["count"])
# plt.xlim(-1, 24)
# # plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
# plt.xlabel("Hour")
# plt.ylabel("Helps Requested");

In [None]:
# # Counting the regsitration page requests
# register_in_hour = user_log_valid.filter((user_log_valid.page == "Error") & (user_log_valid.churn_user == 0)).groupby(user_log_valid.hour).count().orderBy(user_log_valid.hour.cast("float"))
# register_in_hour_pd = register_in_hour.toPandas()
# register_in_hour_pd

In [None]:
# login log out per day

In [None]:
# Ratio of thumbs down over thumbs up
user_log_valid.filter((user_log_valid.churn_user == 0) & ((user_log_valid.page == 'Thumbs Up') | (user_log_valid.page == 'Thumbs Down'))).groupby("page").count().show()

In [None]:
# Ratio of thumbs down over thumbs up
user_log_valid.filter((user_log_valid.churn_user == 1) & ((user_log_valid.page == 'Thumbs Up') | (user_log_valid.page == 'Thumbs Down'))).groupby("page").count().show()

In [None]:
# avg count of each page visit from the total pages visited
user_log_valid.filter(user_log_valid.churn_user == 0).groupby("page").agg({'page':'count'}).show()

In [None]:
# avg count of each page visit from the total pages visited
user_log_valid.filter(user_log_valid.churn_user == 1).groupby("page").agg({'page':'count'}).show()

In [None]:
# Aggregates, number of songs

In [None]:
# How many songs do notChurn users listen to on average between visiting our home page? 
function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = user_log_valid.filter((user_log_valid.page == 'NextSong') | (user_log_valid.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .filter(user_log_valid.churn_user == 0) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

In [None]:
# How many songs do Churn users listen to on average between visiting our home page? 
cusum.filter((cusum.page == 'NextSong')) \
    .filter(user_log_valid.churn_user == 1) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

In [None]:
user_log_valid.head(1)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.