# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [None]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf, isnull
from pyspark.sql.types import IntegerType
from pyspark.sql import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier,LinearSVC, GBTClassifier, DecisionTreeClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import re
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import seaborn as sns
import datetime
from time import time

In [93]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

### Preprocessing Steps

I will perform below steps in the preprocessing part to discover and analyze the dataset in a better way. 

1. Loading the data from json file and make transformation to pandas dataframe.
2. Dealing with missing variables.
    1. Analyzing the variables of columns and removing empty entries, '', in userID.   
    2. Identifying the missing features and removing or replacing them.
3. Converting the timestamp column in more readible way. 
4. Converting gender information to binary variable from Female to Male. 

After performing these steps, *Churn* will be defined according to Cancellation Confirmation feature. In the Churn column, 1 will represent the user has churned and 0 otherwise. 


#### Loading The Data

In [95]:
data_path = 'mini_sparkify_event_data.json'
df = spark.read.json(data_path)
# df.persist()
# See the frame schema
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



Since I am more familiar with pandas dataframe, I will convert the spark object to pandas df.

[1] https://stackoverflow.com/questions/29226210/what-is-the-spark-dataframe-method-topandas-actually-doing

[2] https://www.oreilly.com/library/view/pyspark-cookbook/9781788835367/fe13b699-9b45-4295-a0cb-6375fa98a3e3.xhtml

In [None]:
df_pandas = df.toPandas()
df_pandas.head()

In [None]:
# To know nulls
df_pandas.info()

#### Dealing With Missing Values

Before proceeding the data cleaning, lets see the NaN values in the dataset columns..

In [None]:
df_pandas.isnull().sum()

In [None]:
df = df.dropna(how = 'any', subset = ['userId', 'sessionId'])

In [None]:
df.select('userId').dropDuplicates().sort('userId').show(5)

In [None]:
df_pandas = df.toPandas()

In [None]:
df_pandas.isnull().sum()

It is seen that *FirstName, Gender, LastName, Location, Registration, and UserAgent* have the same number of missing values.

In [None]:
df_pandas['auth'].unique()

In [None]:
df_pandas['gender'].unique()

It is seen that we have None values in gender section.

In [None]:
df_pandas['level'].unique()

In [None]:
df_pandas['method'].unique()

In [None]:
df_pandas['page'].unique()

In [None]:
#df_pandas['registration'].unique()
df_pandas['status'].unique()

In [None]:
df_pandas['userId'].unique()

It is seen that we have **' '** value as an entry in the dataset. So we can clear them first and then check the None values in the gender section.

In [None]:
df_pandas[df_pandas['userId'] == '']['userId'].count()

Lets check the count in the spark dataframe.

[1] https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html

In [None]:
df.filter(df.userId == '').count()

Next step will be removing the **' '** values in UserId. 

In [None]:
df = df.filter(df.userId != '')

In [None]:
df.filter(df.userId == '').count()

In [None]:
df_pandas = df.toPandas()
df_pandas.isnull().sum()

Let's observe the missing values after the conversion.

In [None]:
df_pandas[df_pandas.isnull().any(axis=1)]

Since we will make analysis based on Chunk, it is not needed to drop the None values in artist, length, and song column. Because, they indicate that user is logged in but not listening any song yet therefore no length value.

In [None]:
print('Unique values in auth  column : \n{}'.format(df_pandas['auth'].unique()))
print('Unique values in gender  column : \n{}'.format(df_pandas['gender'].unique()))
print('Unique values in level  column : \n{}'.format(df_pandas['level'].unique()))
print('Unique values in method  column : \n{}'.format(df_pandas['method'].unique()))
print('Unique values in page  column : \n{}'.format(df_pandas['page'].unique()))
print('Unique values in status  column : \n{}'.format(df_pandas['status'].unique()))

It is seen that after dropping the ' ' values in UserId column, we also clean the FirstName, Gender, LastName, Location, Registration, and UserAgent columns which have the same missing number count as UserId ' ' count.

When we compare the unique values in dataset before and after the cleaning of UserId column, it is seen that we have lost two information in the auth column, Logged out and Guest. Additionaly, in the page column, we do not have Login, Registration, and Submit Registration informations anymore. 

Since we have dropped the *empty* user ıds, it might be logical in the sense of users without userid can be guests or logged out users. It make sense for page column also because the lost informations are related to guest users and logged out users.

It is also logical that we have lost None values in gender column, because guest users or logged out users do not have gender information. 



#### Converting The Timestamp

[1] https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html

[2] https://stackoverflow.com/questions/13890935/does-pythons-time-time-return-the-local-or-utc-timestamp

[3] https://stackoverflow.com/questions/2265357/parse-date-string-and-change-format

In [None]:
df_pandas['ts'].head()

In [None]:
get_time = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
df = df.withColumn("time", get_time(df.ts))

In order to use Python function in Spark, I have used udf.

[1] https://changhsinlee.com/pyspark-udf/

[2] https://docs.databricks.com/spark/latest/spark-sql/udf-python.html

In [None]:
df_pandas = df.toPandas()

In [None]:
df_pandas.head()

Since the registration column is also in timestamp format, I will perform the same conversion of ts column for registration column.

In [None]:
df = df.withColumn('registration_time', get_time(df.registration))

In [None]:
df_pandas = df.toPandas()

In [None]:
df_pandas.head()

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

Customer churn occurs when customers/subscribers stop doing business with a company or service. We can define a user as churned if there is an Cancellation Confirmation event. And Churn is defined both free and paid users. In order to track the downgrade event of a paid user can also be sight about customer churn.


[1] https://blog.recurly.com/better-way-to-calculate-your-churn-rate


First look for cancellation confirmation events to define Churn.

In [None]:
# create churn label
flag_cancellation = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())

# Apply it to the dataframe
df = df.withColumn('churn', flag_cancellation('page'))

# label user who churned
windowval = Window.partitionBy('userId')

# Apply it to the dataframe
df = df.withColumn('churn', max('churn').over(windowval))

In [None]:
df_pandas = df.toPandas()

Now, look for downgrade events.

In [None]:
# create downgrade label
flag_downgrade = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())

# Apply it to the dataframe
df = df.withColumn("downgrade", flag_downgrade("page"))

# label user who've ever downgraded
windowval = Window.partitionBy('userId')

# Apply it to the dataframe
df = df.withColumn('downgrade', max('downgrade').over(windowval))

In [None]:
df_pandas = df.toPandas()

In [None]:
df_pandas.sample(5)

Let's observe the spark dataframe

In [None]:
df.select('userId').dropDuplicates().sort('userId').show(5)

In [None]:
df.select(['userId', 'churn', 'downgrade']).dropDuplicates().show(20)

Now we have defined **churn** and **downgrade** events as seperate columns.


### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

##### Gender distribution on churn event

First, we can explore the gender distribution on churn event to understand gender has an effect or not.

[1] https://medium.com/@andykashyap/top-5-tricks-to-make-plots-look-better-9f6e687c1e08

In [None]:
df.dropDuplicates(["userId", "gender"]).groupby(["churn", "gender"]).count().sort("churn").show()

In [None]:
df_pandas_gender = df.dropDuplicates(["userId", "gender"]).groupby(["churn", "gender"]).count().sort("churn").toPandas()

In [None]:
plt.figure(figsize=(10,8))
ax = sns.barplot(x = 'churn', y = 'count', hue = 'gender', data = df_pandas_gender,  palette = "Blues_d")
plt.xlabel('Churn', fontsize = 14)
plt.ylabel('Count', fontsize = 14)
plt.legend(title = 'Gender', loc = 'best', ncol = 2)

plot = plt.title('Gender Distribution of Churn Event', fontsize = 14)


It is seen that male customers are slightly like to be churn than the female users. 

##### Number of played/listened songs

It might be a good indicator to understand the number of played songs in a session among the genders. Therefore, we may see the reason of male users have more likely to be churned.

[1] https://www.datacamp.com/community/blog/seaborn-cheat-sheet-python

[2] https://seaborn.pydata.org/generated/seaborn.violinplot.html

In [None]:
df_pandas_song = df.where('page == "NextSong"').groupby(['churn', 'userId', 'sessionId','gender']).count()\
    .groupby(['churn', 'userId', 'gender']).agg({'count':'avg'})\
    .withColumnRenamed('avg(count)', 'avg_songs_played')\
    .toPandas()

In [None]:
plt.figure(figsize=(10,8))
ax = sns.violinplot('churn', y = 'avg_songs_played', hue = 'gender', data = df_pandas_song, palette="Set2", split = True, inner="quartile")
plt.xlabel('churn', fontsize = 12)
plt.ylabel('Average count of listened songs per session', fontsize = 12)
plt.legend(title = 'Gender', loc = 'best', ncol = 2)
plt.title('Listened music distribution among gender')
sns.despine(ax=ax);

It is seen that churned users had listened less music than the others. And also churned female users average listened songs are more than males. 

###### Notes on Violin Plot

A violin plot plays a similar role as a box and whisker plot. It shows the distribution of quantitative data across several levels of one (or more) categorical variables such that those distributions can be compared. Unlike a box plot, in which all of the plot components correspond to actual datapoints, the violin plot features a kernel density estimation of the underlying distribution.

Violin plots have many of the same summary statistics as box plots:
* The white dot represents the median
* The thick gray bar in the center represents the interquartile range
* The thin gray line represents the rest of the distribution, except for points that are determined to be “outliers” using a method that is a function of the interquartile range.

On each side of the gray line is a kernel density estimation to show the distribution shape of the data. Wider sections of the violin plot represent a higher probability that members of the population will take on the given value; the skinnier sections represent a lower probability.

[1] https://mode.com/blog/violin-plot-examples

[2] https://blog.bioturing.com/2018/05/16/5-reasons-you-should-use-a-violin-graph/

In [None]:
df_pandas_likes = df.where('page == "NextSong" OR page == "Thumbs Up"').groupby(['userId', 'churn', 'gender', 'page']).count().toPandas()
df_pandas_likes = df_pandas_likes.pivot_table(index=['userId','churn','gender'], values='count', columns='page').reset_index()

In [None]:
plt.figure(figsize=(10,8))
ax = sns.violinplot(data = df_pandas_likes, x = 'churn', y = 'NextSong', hue = 'gender', split = True,  inner = "quartile", palette="Set2")
plt.xlabel('Churn Event')
plt.ylabel('Played songs count according to NextSong Event')
plt.legend(title = 'Gender', loc = 'best')
title = plt.title('Total listening song distribution effect on churn by gender')

It seems that the mean of the total listened songs do not change that much among churned and unchurned users. However, we can say that female users have larger mean value and distribution than the male users.

In [None]:
plt.figure(figsize=(10,8))
ax = sns.violinplot(data = df_pandas_likes, x = 'churn', y = 'Thumbs Up', hue = 'gender', split = True,  inner = "quartile", palette="Set3")
plt.xlabel('Churn event')
plt.ylabel('Liked song count according to ThumbsUp event')
plt.legend(title='Gender', loc='best')
title = plt.title('Total liked song distribution effect on churn by gender')

Also for the liked song distribution, churned and normal users have similar characteristics.

###### Churned user status

It might me a good indicator to see which payment status of user had churned most.

In [None]:
df_pandas_level = df.filter('page == "Cancellation Confirmation"').groupby('level').count().toPandas()

In [None]:
plt.figure(figsize=(10,8))
ax = sns.barplot(data = df_pandas_level, x = 'level', y = 'count', palette="Set1")
plt.xlabel('Payment level', fontsize = 12)
plt.ylabel('')
plt.title('Churned user payment level distribution', fontsize = 14)
sns.despine(ax=ax);

It is seen that paid users had churned more than free users.

###### Page distribution

Let see the page distribution of churned and current users to understand the differences between them.

In [None]:
churned_user = df_pandas[df_pandas.churn == 1].groupby(['page'])['userId'].count()
churned_user = churned_user /churned_user.sum()*100

active_user = df_pandas[df_pandas.churn == 0].groupby(['page'])['userId'].count()
active_user = active_user /active_user.sum()*100

users_df = pd.DataFrame({'Cancelled': churned_user,'Active users':active_user})

In [None]:
ax = users_df.plot(kind='barh', figsize=(12,10), cmap = "plasma");
ax.set_xlabel('Percent of envent occurence (%)')
ax.set_title('Percent of envent occurence for active and cancelled users');

Since Next Song creates a bias in the figure, I have decided to drop it.

In [None]:
churned_user = df_pandas[df_pandas.churn == 1].groupby(['page'])['userId'].count().drop('NextSong')
# Define percentage to visualize better
churned_user = churned_user /churned_user.sum()*100

active_user = df_pandas[df_pandas.churn == 0].groupby(['page'])['userId'].count().drop('NextSong')
# Define percentage to visualize better
active_user = active_user /active_user.sum()*100

users_df = pd.DataFrame({'Cancelled': churned_user,'Active users':active_user})

In [None]:
ax = users_df.plot(kind='barh', figsize=(12,10), cmap = "plasma");
ax.set_xlabel('Percentage of envent occurence (%)')
ax.set_title('Percentage distribution of page events among the active and cancelled users ');

##### Membership duration

Lastly, we can investigate the membership duration of users to see effect on churn event.

In [None]:
df_pandas_lifetime = df\
    .select('userId','registration','ts','churn') \
    .withColumn('lifetime',(df.ts-df.registration)) \
    .groupBy('userId','churn') \
    .agg({'lifetime':'max'}) \
    .withColumnRenamed('max(lifetime)','lifetime') \
    .select('userId', 'churn', (col('lifetime')/1000/3600/24).alias('lifetime')) \
    .toPandas()

In [None]:
ax = sns.boxplot(data = df_pandas_lifetime, y = 'churn', x = 'lifetime', orient = 'h', palette = "Set2")
plt.xlabel('Days since registration when churn')
plt.ylabel('Has customer churned')
title = plt.title('Churned customer use the service for a shorter period of time')

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

###### Dataset Features:
In the dataset, we have below features:

<u>Categorical features</u>
* Currently listening artists
* Authorization
* First/Last name of user
* Gender of user
* The payment level of membership (Paid/Free)
* Location
* Method
* Current page
* Currently listening song
* User agent, (MacOs, Windows, etc.)

<u>Numerical features</u>
* Item in session
* Length
* Registration date in timestamp format
* SessionId
* Status
* Time as ts in timestamp format
* Userid


For the analysis, we will add the below features according to the data exploration that we have performed in the previous section.

1. Gender feature as numerical/binary variable
2. Number of played/listened songs
3. Number of liked songs according to thumbs up/down
4. Number of songs added to playlist
5. Number of songs listened per session
6. Number of listened artists
7. Number of added friends.
8. Total time duration/lenght of listening event
9. Membership duration
10. Churn label
11. Downgrade label




In [None]:
# Gender feature as numerical/binary variable
feature_1 = df.select("userId", "gender").dropDuplicates().replace(['M', 'F'], ['0', '1'], 'gender').select('userId', col('gender').cast('int'))
feature_1.show(5)

In [None]:
# Number of played/listened songs
feature_2 = df.select('userID','song').groupBy('userID').count().withColumnRenamed('count', 'total_songs')
feature_2.show(5)

In [None]:
# Number of liked songs according to thumbs up/down
feature_3 = df.select('userID','page').where(df.page == 'Thumbs Up').groupBy('userID').count().withColumnRenamed('count', 'thumbs_up')
feature_3.show(5)

In [None]:
# Number of liked songs according to thumbs up/down
feature_4 = df.select('userID','page').where(df.page == 'Thumbs Down').groupBy('userID').count().withColumnRenamed('count', 'thumbs_down')
feature_4.show(5)

In [None]:
# Number of songs added to playlist
feature_5 = df.select('userID','page').where(df.page == 'Add to Playlist').groupBy('userID').count().withColumnRenamed('count', 'add_to_playlist')
feature_5.show(5)

In [None]:
# Number of songs listened per session
feature_6 = df.where('page == "NextSong"').groupby(['userId', 'sessionId']).count().groupby(['userId']).agg({'count':'avg'}).withColumnRenamed('avg(count)', 'avg_songs_played')
feature_6.describe().show()

In [None]:
# Number of listened artists
feature_7 = df.filter(df.page=="NextSong").select("userId", "artist").dropDuplicates().groupby("userId").count().withColumnRenamed("count", "artist_count")
feature_7.show(5)


In [None]:
# Number of added friends
feature_8 = df.select('userID','page').where(df.page == 'Add Friend').groupBy('userID').count().withColumnRenamed('count', 'add_friend') 
feature_8.show(5)

In [None]:
# Total time duration/lenght of listening event
feature_9 = df.select('userID','length').groupBy('userID').sum().withColumnRenamed('sum(length)', 'listen_time')
feature_9.show(5)


In [None]:
# Membership duration
feature_10 = df.select('userId','registration','ts').withColumn('lifetime',(df.ts-df.registration)).groupBy('userId') \
    .agg({'lifetime':'max'}).withColumnRenamed('max(lifetime)','lifetime').select('userId', (col('lifetime')/1000/3600/24).alias('lifetime'))
feature_10.show(5)

In [None]:
# Churn label
churn_label = df.select('userId', col('churn').alias('label')).dropDuplicates()
churn_label.show(5)

In [None]:
# Downgrade label
downgrade_label = df.select('userId', col('downgrade').alias('downgrade_label')).dropDuplicates()
downgrade_label.show(5)

In [None]:
data = feature_1.join(feature_2,'userID','outer') \
    .join(feature_3,'userID','outer') \
    .join(feature_4,'userID','outer') \
    .join(feature_5,'userID','outer') \
    .join(feature_6,'userID','outer') \
    .join(feature_7,'userID','outer') \
    .join(feature_8,'userID','outer') \
    .join(feature_9,'userID','outer') \
    .join(feature_10,'userID','outer') \
    .join(churn_label,'userID','outer') \
    .join(downgrade_label,'userID','outer') \
    .drop('userID') \
    .fillna(0)

data.show(5)

In [69]:
# Saving as SPARK format
data.write.save('SparkFile_5.CSV', format='csv', header=True)

In [None]:
data_pandas = data.toPandas()
data_pandas.head(10)

In [71]:
data_pandas.to_csv('PandasFile_61.CSV', index=False)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize. 

##### Notes on F1 Score

According to Wikipedia, F1 Score is defined as a weighted average of the precision and recall where an F1 score reached its best value at 1 and worst at 0. But first lets define the precision and recall:

* The precision is the number of correct positive results divided by the number of all positive results. Precision talks about how precise/accurate your model is out of those predicted positive, how many of them are actual positive. Precision is a good measure to determine, when the costs of False Positive is high. For instance, email spam detection. In email spam detection, a false positive means that an email that is non-spam (actual negative) has been identified as spam (predicted spam). The email user might lose important emails if the precision is not high for the spam detection model.

* The recall is the number of correct positive results divided by the number of positive results that should have been returned. Recall actually calculates how many of the Actual Positives our model capture through labeling it as Positive (True Positive). Applying the same understanding, we know that Recall shall be the model metric we use to select our best model when there is a high cost associated with False Negative.


And accuracy is one of the more obvious metrics, it is the measure of all the correctly identified cases. It is most used when all the classes are equally important.

F1 Score is defined as the harmonic mean of Precision and Recall and gives a better measure of the incorrectly classified cases than the Accuracy Metric. And the use case differences of accuracy and F1 Score can be summarized as below:

1. Accuracy is used when the True Positives and True negatives are more important while F1-score is used when the False Negatives and False Positives are crucial

2. Accuracy can be used when the class distribution is similar while F1-score is a better metric when there are imbalanced classes as in the above case.

3. In most real-life classification problems, imbalanced class distribution exists and thus F1-score is a better metric to evaluate our model on.


[1] https://towardsdatascience.com/accuracy-precision-recall-or-f1-331fb37c5cb9

[2] https://adamyedidia.files.wordpress.com/2014/11/f_score.pdf

[3] https://medium.com/analytics-vidhya/accuracy-vs-f1-score-6258237beca2

In [None]:
data_pandas.columns

We will use the saved features as the inputs except the churn event, indicated as label. Because the churn label will be the feature that we want to predict by using the related features.

After defining the features, we will create the feature vector and perform transformation.

In [None]:
# The input features are all features except the Churn Label and the output is the churn event.

features_labels = ['gender', 'total_songs', 'thumbs_up', 'thumbs_down', 'add_to_playlist',
                   'avg_songs_played', 'artist_count', 'add_friend', 'listen_time', 'lifetime', 'downgrade_label' ]

# Define the vector assembler for all input columns
features_vector  = VectorAssembler(inputCols = features_labels, outputCol = "features")

input_data = features_vector.transform(data)

As a next step, we will perform standardization. Standardization is performed because using these variables without standardization in effect gives the variable with the larger range a weight of 100 in the analysis. Typical data standardization procedures equalize the range and/or data variability. Therefore we standardize the features by taking off the mean and divided by the standard deviation of each feature.

[1] https://spark.apache.org/docs/latest/ml-features

[2] https://scikit-learn.org/stable/modules/preprocessing.html

In [None]:
# standard scaler
features_scaler = StandardScaler(inputCol = "features", outputCol="scaled_features", withMean=True)
features_scaler_fit = features_scaler.fit(input_data)
scaled_inputs = features_scaler_fit.transform(input_data)

After standardize our features, we can divide our dataset in to test, train, and validation. We can use randomSplit function to divide our dataset. 

RandomSplit is performing the randomly splits this DataFrame with the provided weights. When we perform random splitting we are splitting the data randomly to generate two sets: one to use during training of the ML algorithm (training set), and the second to check whether the training is working (test set). This is widely done and a very good idea, as it catches overfitting which otherwise can make it seem like you have a great ML solution when it's actually effectively just memorized the answer for each data point and can't interpolate or generalize.

And one of the main problem is selecting the seed which is defined according to reference 4.

[1] https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html

[2] https://stackoverflow.com/questions/40606456/randomsplit-dont-respect-specific-weights-pyspark

[3] https://stackoverflow.com/questions/24857650/issue-understanding-splitting-data-in-scala-using-randomsplit-for-machine-lear

[4] https://medium.com/@ODSC/properly-setting-the-random-seed-in-ml-experiments-not-as-simple-as-you-might-imagine-219969c84752

In [None]:
# train test split
train, rest = scaled_inputs.randomSplit([0.8, 0.2], seed = 45)
validation, test = rest.randomSplit([0.5, 0.5], seed = 45)

In [None]:
results_base_all_1 = test.withColumn('prediction', lit(1.0))

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Test set metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(results_base_all_1, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_base_all_1, {evaluator.metricName: "f1"})))

In [None]:
results_base_all_0 = test.withColumn('prediction', lit(0.0))

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Test set metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(results_base_all_0, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_base_all_0, {evaluator.metricName: "f1"})))

As a next step, we will perform classification with different algorithms to observe the result. 

[1] https://spark.apache.org/docs/latest/ml-classification-regression.html

##### Gradient Boost and Random Forest


The idea of boosting came out of the idea of whether a weak learner can be modified to become better. A weak hypothesis or weak learner is defined as one whose performance is at least slightly better than random chance.
Gradient Boost Trees is the modified and improved version of AdaBoost. 

AdaBoost, which is the abbreviation of the Adaptive Boosting, is the first practical boosting algorithm proposed by Freund and Schapire in 1996. AdaBoost is an algorithm for constructing a “strong” classifier as linear combination of “simple” “weak” classifier. In other words, the main working principle is to convert a set of weak classifiers into a strong one. Weak classifier is described as less than 50% error over any distribution and strong classifier is thresholded linear combination of the weak classifier outputs. AdaBoost works by weighting the observations, putting more weight on difficult to classify instances and less on those already handled well. New weak learners are added sequentially that focus their training on the more difficult patterns. Predictions are made by majority vote of the weak learners’ predictions, weighted by their individual accuracy. 

Gradient boosting involves three elements:
* A loss function to be optimized. The loss function used depends on the type of problem being solved. It must be differentiable, but many standard loss functions are supported and you can define your own. For example, regression may use a squared error and classification may use logarithmic loss
* A weak learner to make predictions. Decision trees are used as the weak learner in gradient boosting
* An additive model to add weak learners to minimize the loss function. Trees are added one at a time, and existing trees in the model are not changed. A gradient descent procedure is used to minimize the loss when adding trees.

GBT build trees one at a time, where each new tree helps to correct errors made by previously trained tree. A great application of GBM is anomaly detection in supervised learning settings where data is often highly unbalanced such as DNA sequences, credit card transactions or cybersecurity.

The differences between the GBT and RF can be explained as Gradient boosting uses regression trees for prediction purpose where a random forest use decision tree. ... The random forest is easy to parallelize but boosted trees are hard to do. Random forests overfit a sample of the training data and then reduces the overfit by simple averaging the predictors. If you carefully tune parameters, gradient boosting can result in better performance than random forests. However, gradient boosting may not be a good choice if you have a lot of noise, as it can result in overfitting. They also tend to be harder to tune than random forests.

[1] https://machinelearningmastery.com/gentle-introduction-gradient-boosting-algorithm-machine-learning/

[2] https://medium.com/@aravanshad/gradient-boosting-versus-random-forest-cfa3fa8f0d80

[3] https://www.datasciencecentral.com/profiles/blogs/decision-tree-vs-random-forest-vs-boosted-trees-explained

In [None]:
# initialize the GBT classifier
gbt = GBTClassifier(maxIter = 10, seed = 45)

# set evaluator
f1_evaluator = MulticlassClassificationEvaluator(metricName='f1')

# build paramGrid
paramGrid = ParamGridBuilder().build()

gbt_crossvalidation = CrossValidator(estimator = gbt,
                                    estimatorParamMaps = paramGrid,
                                    evaluator = f1_evaluator,
                                    numFolds = 3)

Fit the training data:

In [None]:
start = time()
cvModel_gbt = gbt_crossvalidation.fit(train)
end = time()
cvModel_gbt.avgMetrics
print('The training process took {} seconds'.format(end - start))

In [None]:
gbt_result = cvModel_gbt.transform(validation)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction")
print('Gradient Boosted Trees Metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(gbt_result, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(gbt_result, {evaluator.metricName: "f1"})))

In [None]:
# initialize the random forest classifier
randomforest = RandomForestClassifier()

# set evaluator
f1_evaluator = MulticlassClassificationEvaluator(metricName = 'f1')

# build paramGrid
paramGrid = ParamGridBuilder().build()

randomforest_crossval = CrossValidator(estimator = randomforest,
                              estimatorParamMaps = paramGrid,
                              evaluator = f1_evaluator,
                              numFolds = 3)

In [None]:
start = time()
cvModel_rf = randomforest_crossval.fit(train)
end = time()
cvModel_rf.avgMetrics
print('The training process took {} seconds'.format(end - start))

In [None]:
randomforest_result = cvModel_rf.transform(validation)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Random Forest Metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(randomforest_result, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(randomforest_result, {evaluator.metricName: "f1"})))

##### Support Vector Machines
The working principle of the support vector machines is to find or create the hyperplane in an N-dimensional space that distinctly classifies the data points. The dimension of the hyperplane is depending on the number of the features. Hyperplanes are basicly the desicion boundaries that helps the classification of the data such that data points which are locating at te different side of the hyperplane are classified as different classes. Support vector machines are commonly used in face detection, text and hypertext categorization, classification of images, bioinformatics and, handwriting recognition. One of the real world example is performed in the article "Application of support vector machine modeling for prediction of common diseases: the case of diabetes and pre-diabetes".


In [None]:
# initialize classifier
svm = LinearSVC(maxIter = 10)

# set evaluator
f1_evaluator = MulticlassClassificationEvaluator(metricName='f1')

# build paramGrid
paramGrid = ParamGridBuilder().build()

svm_crossval = CrossValidator(estimator=svm,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=3)

In [None]:
start = time()
cvModel_svm = svm_crossval.fit(train)
end = time()
cvModel_svm.avgMetrics
print('The training process took {} seconds'.format(end - start))

In [None]:
svm_result = cvModel_svm.transform(validation)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('SVM Metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(svm_result, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(svm_result, {evaluator.metricName: "f1"})))

##### Decision Tree Classifier

In decision analysis, one of the visual and explicit representation of decision and decision making procedure can be performed by using decision trees. The decision tree is using a tree-like model of decisions such that a flowchart like tree structure in which each node internal node denotes a test and each branch represents an outcome of the test and each leaf node holds a corresponding class label. Decision tree is the one of the most powerfull and popular tool for classification and prediction. Decision trees are commonly using in medical diagnosis, failure prediction, credit scroing and crime risk investigation. Several business analysis examples can be found but one of the widely used one is the detection of the raudulent Financial Statements.

In [None]:
dt = DecisionTreeClassifier()

# set evaluator
f1_evaluator = MulticlassClassificationEvaluator(metricName='f1')

# build paramGrid
paramGrid = ParamGridBuilder().build()

dt_crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=f1_evaluator,
                          numFolds=3)

In [None]:
start = time()
cvModel_dt = dt_crossval.fit(train)
end = time()
cvModel_dt.avgMetrics
print('The training process took {} seconds'.format(end - start))

In [None]:
dt_result = cvModel_dt.transform(validation)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('SVM Metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(dt_result, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(dt_result, {evaluator.metricName: "f1"})))

Model summary: 
The logistic regression model has a accuracy of: 0.735, and F1 score of:0.622, using 1047.5 seconds on our server.
The gradient boosted trees model has a accuracy of: 0.776, and F1 score of: 0.756, using 1560.5 seconds on our server.
The support vector machine model has a accuracy of: 0.735, and F1 score of: 0.622, using 1184.0 seconds on our server.
The random forest model has a accuracy of: 0.776, and F1 score of: 0.708, using 1220.6 seconds on our server.
Although we do care about time resources, but since the data size is still reletively small, and the performance difference is huge, we will prefer the model that perform the best. Therefore, we choose GBT model as our final used model and conduct a grid search to fine tune our model this time.

In the future, we may instead implement random forest model for more time efficiency.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.