# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

### Metrics to consider:
- Monthly active users
- Daily active users in past month
- Total paid and unpaid users
- Total ads served in the past month
- Cohort per Month - % of users cancelled, % of users upgrades


# Import Libraries

In [5]:
# import libraries
import seaborn as sns
import pandas as pd
import numpy as np
import datetime # missing and together with "pip install --upgrade pyspark" were causing issues with time series
import matplotlib.pyplot as plt
plt.style.use('ggplot')
%matplotlib inline
#from plotly.offline import iplot
#import plotly.graph_objects as go
#from plotly import graph_objs as go
#from plotly import express as px
#import plotly.express as px
import warnings
warnings.filterwarnings("ignore")
#import cufflinks as cf
#cf.go_offline()
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler, MinMaxScaler
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC, GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune

In [None]:
!pip install plotly
!pip install cufflinks

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [6]:
# create a Spark session
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

In [7]:
spark

In [8]:
print(spark.catalog.listTables())

[]


In [9]:
pwd

'/home/workspace'

In [10]:
mini = '/home/freemo/Projects/largeData/mini_sparkify_event_data.json'
medium = '/home/freemo/Projects/largeData/medium_sparkify_event_data.json'

In [11]:
df = spark.read.json(mini)

AnalysisException: 'Path does not exist: file:/home/freemo/Projects/largeData/mini_sparkify_event_data.json;'

In [None]:
df.take(1)

In [None]:
df.show(5)

In [None]:
df.printSchema()

In [None]:
# check columns with Null values
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [None]:
# missing values in userID
df.select([count(when(isnan('userID'),True))]).show()

In [None]:
# missing values in sessionID
df.select([count(when(isnan('sessionID'),True))]).show()

In [None]:
# get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
# df = df.withColumn("hour", get_hour(df.ts))
# songs_in_hour = df.filter(df.page == "NextSong").groupBy(df.hour).count().orderBy(df.hour.cast("float"))
# songs_in_hour.show()

# songs_in_hour_pd = songs_in_hour.toPandas()
# songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

In [None]:
# create udf (user defined functions) for workdays and hour columns
day_name= ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday','Sunday']
day_ind = [1,2,3,4,5,6,7]
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).hour, IntegerType() )
get_day = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).day, IntegerType() )
get_wkday = udf(lambda x: day_name[datetime.datetime.fromtimestamp(x/1000).weekday()] )
get_month = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).month, IntegerType() )

In [None]:
# create new columns based on ts column for analyses
df = df.withColumn('hour', get_hour('ts'))
df = df.withColumn('day', get_day('ts'))
df = df.withColumn('workday_', get_wkday('ts'))
df = df.withColumn('month', get_month('ts'))

In [None]:
df.select('hour', 'day', 'workday_', 'month').show(7)

In [None]:
# df.groupBy('ts_hour').count().show()

In [None]:
# df.groupBy('ts_day').count().show(31)

In [None]:
user_log_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])

In [None]:
user_log_valid.count()

In [None]:
df.select("userId").dropDuplicates().sort("userId").show()

In [None]:
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

In [None]:
user_log_valid.count() - 286500

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [None]:
# check the page column
df.select("page").dropDuplicates().sort("page").show()

In [None]:
# How many songs do users listen to on average between visiting the home page? 

function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

In [None]:
# top 5 played artist
df.filter(df.page == 'NextSong') \
    .select('Artist') \
    .groupBy('Artist') \
    .agg({'Artist':'count'}) \
    .withColumnRenamed('count(Artist)', 'Artistcount') \
    .sort(desc('Artistcount')) \
    .show(5)

In [None]:
# top 5 played songs
df.filter(df.page == 'NextSong') \
    .select('song') \
    .groupBy('song') \
    .agg({'song':'count'}) \
    .withColumnRenamed('count(song)', 'SongCount') \
    .sort(desc('Songcount')) \
    .show(5)

In [None]:
# check a random user
df.select(["userId", "firstname", "page", "level", "song"]).where(df.userId == 98).collect()

In [None]:
# create a udf to flag Churned customers
flag_downgrade_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())

In [None]:
# create new column "churn"
df = df.withColumn("churn", flag_downgrade_event("page"))

In [None]:
df.head()

In [None]:
# add a staging: stage=1 if the user is paid, stage=2 if the user churned
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

In [None]:
# create new column stage to assign values
df = df.withColumn("phase", Fsum("churn").over(windowval))

User 39 played the biggest number of songs. Let's check if that matters by going through it's records.

In [None]:
# check result
df.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(df.userId == "39").sort("ts").collect()

User 39 started as free, but after some decent amount of songs listened it became a paid member. 

In [None]:
# check the number of songs played per user
df.where(df.song != 'null').groupBy(['churn','userId']) \
    .agg(count(df.song).alias('SongsPlayed')).sort('SongsPlayed', ascending=False).show()

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
# split by gender 
df.dropDuplicates(["userId"]).groupBy(['gender','churn']).count().show()

In [None]:
df.dropDuplicates(['userId']).groupBy(['song','churn']).count().show()

In [None]:
df.where(df.song != 'null').groupBy(['churn','userId']) \
    .agg(count(df.song).alias('SongsPlayed')).sort('SongsPlayed', ascending=False).show()

In [None]:
df.printSchema()

In [None]:
# plot locations 
# On which days do people use sparkify more?
# What are the most active hours?
# Songs played per user group - churned/not churned
# Which days of the month are more active?
# Which subscription level is more likely to churn?
# Do activity times have effect over churn?
# https://hendra-herviawan.github.io/pyspark-groupBy-and-aggregate-functions.html
# https://github.com/ihsankose/Udacity-Capstone/blob/master/Sparkify.ipynb
# https://nbviewer.jupyter.org/github/elifinspace/sparkify/blob/master/Sparkify_final_.ipynb

1. Try to aggregate based on songs and month
2. Plot some EDA based on notes
3. Clean data set for modeling (refer to previous projects with ETL pipelines)
4. Model based on DataCamp course (if possible|)

In [None]:
# plot location
plot = df.groupBy('location', "churn").count().dropDuplicates().sort('count', ascending=False).toPandas()

In [None]:
fig = px.bar(plot, x='location', y='count')
fig.show()

In [None]:
# create state column
df = df.withColumn("state", df.location.substr(-2,2))

In [None]:
df.select('state').show()

In [None]:
df.groupBy("state").count().sort("count", ascending=False).show(5)

In [None]:
# convert to pandas
state = df.groupBy("state", "churn").count().sort("count", ascending=False).toPandas()

In [None]:
fig = px.bar(state, x='state', y='count')
fig.show()

In [None]:
# check state by churn
churn = df.filter(df.churn == 1)
churn.groupBy("state", "churn").count().sort("count", ascending=False).show(5)

In [None]:
# plot churn by state
churned_state = churn.groupBy("state", "churn").count().sort("count", ascending=False).toPandas()
fig = px.bar(churned_state, x='state', y='count')
fig.show()

In [None]:
# churn by workday
temp = churn.groupBy('workday').count().sort('count').toPandas()
fig = px.bar(temp, x='workday', y='count', text="count")
fig.update_layout(title_text='What are the most active days for churn?')
fig.show()

In [None]:
churn.groupBy('song', "churn").count().dropDuplicates().sort('count', ascending=False).show()

**No songs available for churned users**

In [None]:
# what are the most active days?
temp = df.groupBy('workday').count().sort('count').toPandas()
fig = px.bar(temp, x='workday', y='count', text="count")
fig.update_layout(title_text='What are the most active days?')
fig.show()

In [None]:
# What are the most active hours?
temp = df.groupBy('hour').count().sort('hour').toPandas()
fig = px.bar(temp, x='hour', y='count', text='count')
fig.update_layout(title_text='What are the most active hours?')
fig.show()

In [None]:
# What are the most active hours for churn?
temp = churn.groupBy('hour').count().sort('hour').toPandas()
fig = px.bar(temp, x='hour', y='count', text='count')
fig.update_layout(title_text='What are the most active hours for churn?')
fig.update_traces(texttemplate='%{text:.2s}', textposition='outside')
fig.show()

In [None]:
# What are the most active days during the month?
temp = df.groupBy('day').count().sort('day').toPandas()
fig = px.bar(temp, x='day', y='count', text='count')
fig.update_layout(title_text='What are the most active days during the month?')
fig.show()

In [None]:
# What are the most active days during the month for churn?
temp = churn.groupBy('day').count().sort('day').toPandas()
fig = px.bar(temp, x='day', y='count', text='count')
fig.update_layout(title_text='What are the most active days during the month?')
fig.show()

In [None]:
# How people are split per gender?
temp = df.groupBy('gender').count().sort('gender').toPandas()
fig = px.bar(temp, x='gender', y='count', text='count')
fig.update_layout(title_text='Gender split')
fig.show()

In [None]:
# How people are split per gender?
temp = churn.groupBy('gender').count().sort('gender').toPandas()
fig = px.bar(temp, x='gender', y='count', text='count')
fig.update_layout(title_text='Gender split (churn)')
fig.show()

Eventhough more users are female, it turns out that Men are churning the most.

### Based on the EDA I would choose the below features:
- state
- workday
- day
- songs played
- gender
- level

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [None]:
df.printSchema()

In [None]:
df.select('gender', 'userId', 'workday_', 'workday', 'day', 'churn', 'state').show()

In [None]:
# create state column
# df = df.withColumn("state", df.location.substr(-2,2))

# create udf (user defined functions) for workdays and hour columns
#day_name= ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday','Sunday']
#day_ind = [1,2,3,4,5,6,7]
#get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).hour, IntegerType() )
#get_day = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).day, IntegerType() )
#get_wkday = udf(lambda x: day_name[datetime.datetime.fromtimestamp(x/1000).weekday()] )
#get_month = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).month, IntegerType() )

# create new columns based on ts column for analyses
#df = df.withColumn('hour', get_hour('ts'))
#df = df.withColumn('day', get_day('ts'))
#df = df.withColumn('workday', get_wkday('ts'))
#df = df.withColumn('month', get_month('ts'))

# check the number of songs played per user
#df.where(df.song != 'null').groupBy(['churn','userId']) \
    #.agg(count(df.song).alias('SongsPlayed')).sort('SongsPlayed', ascending=False).show()

### Feature 1 - gender

In [None]:
# gender column
gender = df.dropDuplicates(['userId']).sort('userId').select(['userId','gender'])
gender = gender.filter(gender["userId"] != "")
gender = gender.replace(['M','F'], ['1', '0'], 'gender')
gender = gender.withColumn('gender', gender.gender.cast("int"))

In [None]:
gender.take(1)

### Feature 2 - workday

In [None]:
# convert workdays in digits
get_wkday = udf(lambda x: day_ind[datetime.datetime.fromtimestamp(x/1000).weekday()], IntegerType() )
df = df.withColumn('workday', get_wkday('ts'))

In [None]:
df.take(1)

In [None]:
# workday column
workday = df.dropDuplicates(['userId']).sort('userId').select(['userId','workday'])
workday = workday.filter(workday["userId"] != "")

In [None]:
workday.take(1)

### Feature 3 - day

In [None]:
# day column
day = df.dropDuplicates(['userId']).sort('userId').select(['userId','day'])
day = day.filter(day["userId"] != "")

In [None]:
day.take(1)

### Feature 4 - state

In [None]:
# state column
state = df.dropDuplicates(['userId']).sort('userId').select(['userId','state'])
state = state.filter(state["userId"] != "")

In [None]:
state.take(1)

### Feature 5 - SongsPlayed

In [None]:
# songs played column
songs_played = df.where(df.song!='null').groupby('userId') \
    .agg(count(df.song).alias('SongsPlayed')).orderBy('userId').select(['userId','SongsPlayed'])
songs_played = songs_played.filter(songs_played["userId"] != "")

In [None]:
songs_played.take(1)

### Features data frame

In [None]:
# create features data frame
model_data = df.dropDuplicates(['userId']).select(['userId','churn'])
model_data = model_data.filter(state["userId"] != "")

In [None]:
model_data.take(1)

In [None]:
# convert column churn to label
# model_data = model_data.withColumn("label", model_data.churn)
model_data = model_data.withColumnRenamed("churn", "label")

In [None]:
model_data.take(1)

In [None]:
# join all features into one data frame
model_data = model_data.join(gender, 'userId')
model_data = model_data.join(workday, 'userId')
model_data = model_data.join(day, 'userId')
#model_data = model_data.join(state, 'userId')
model_data = model_data.join(songs_played, 'userId')

In [None]:
model_data.show()

In [None]:
# check column types
model_data.dtypes

### Covert columns to integers for modeling

In [None]:
# convert string columns to integers
model_data = model_data.withColumn("userId", model_data.userId.cast("integer"))

In [None]:
model_data.dtypes

In [None]:
# remove any missing values
model_data = model_data.filter("userId is not NULL \
and label is not NULL \
and gender is not NULL \
and workday is not NULL \
and day is not NULL \
and SongsPlayed is not NULL")

In [None]:
# remove any missing values
model_data = model_data.filter("userId is not NULL \
and churn is not NULL \
and gender is not NULL \
and workday is not NULL \
and day is not NULL \
and state is not NULL \
and SongsPlayed is not NULL")

In [None]:
# Create a StringIndexer
state_indexer = StringIndexer(inputCol ="state", outputCol ="state_index")

In [None]:
# Create a OneHotEncoder
state_encoder = OneHotEncoder(inputCol="state_index", outputCol="state_fact")

In [None]:
model_data.take(1)

In [None]:
# Make a VectorAssembler
vec_assembler = VectorAssembler(inputCols=["gender", "state_fact", "workday", "day", "SongsPlayed"], outputCol="features")

ETL pipeline

1. Load data
2. Create workdays/days columns
3. Create features
4. Join data

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

### 1. Train Test split

In [None]:
# split the data
train, test = model_data.randomSplit([0.6, 0.4])

### 2. Build Pipeline

In [None]:
# Create a StringIndexer
state_indexer = StringIndexer(inputCol ="state", outputCol ="state_index")

# Create a OneHotEncoder
state_encoder = OneHotEncoder(inputCol="state_index", outputCol="state_fact")

In [None]:
%%timeit

# vector for features
vec_assembler = VectorAssembler(inputCols=["gender", "workday", "day", "SongsPlayed"], \
                                outputCol="features")

# standard scaler
scaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd=True)

# build indexer
indexer = StringIndexer(inputCol="churn", outputCol="label")

# Call Logistic regression
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

#stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42)

# Build pipeline
pipeline_lr = Pipeline(stages=[state_indexer, state_encoder, vec_assembler, scaler, indexer, lr])

### 3. Tune model

In [None]:
# Build parameter grid 
paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam,[0.0, 0.1, 0.01]) \
    .build()

# Build Cross Validator
crossval_lr = CrossValidator(estimator=pipeline_lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

In [None]:
# vector problem
cvModel_lr = crossval_lr.fit(train)

### 4. Compute Accuracy of the best model

In [None]:
# Make the pipeline
churn_pipe = Pipeline(stages=[state_indexer, state_encoder, vec_assembler])

#flights_pipe = Pipeline(stages=[dest_indexer, dest_encoder, carr_indexer, carr_encoder, vec_assembler])

In [None]:
# Fit and transform the data
piped_data = churn_pipe.fit(model_data).transform(model_data)

In [None]:
piped_data.dtypes

In Spark it's important to make sure you split the data after all the transformations. This is because operations like StringIndexer don't always produce the same index even when given the same list of strings.

In [None]:
# Split the data into training and test sets
training, test = piped_data.randomSplit([.6, .4])

In [None]:
training.take(1)

In [None]:
# Create a LogisticRegression Estimator
lr = LogisticRegression()

In [None]:
# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [None]:
# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()

In [None]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=lr,
            estimatorParamMaps=grid,
            evaluator=evaluator
            )

In [None]:
# Fit cross validation models
models = cv.fit(training)

In [None]:
# Extract the best model
best_lr = models.bestModel

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.