# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [123]:
# import libraries
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [124]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Our first Python Spark SQL example") \
    .getOrCreate()

#### See the configurations

In [125]:
spark.sparkContext.getConf().getAll()

[('spark.app.name', 'Our first Python Spark SQL example'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '43847'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '18b82acc310f'),
 ('spark.app.id', 'local-1632961054301'),
 ('spark.ui.showConsoleProgress', 'true')]

In [126]:
spark

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

#### Load Dataset

In [127]:
path = "mini_sparkify_event_data.json"
data_churn = spark.read.json(path)

In [128]:
print('No of Rows:', data_churn.count(),'\nNo of columns:', len(data_churn.columns))

No of Rows: 286500 
No of columns: 18


In [129]:
data_churn.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



#### Initial dataset analysis

In [130]:
from pyspark.sql.functions import isnull

cols = data_churn.columns
table = {}

for col in cols:
    table[col] = {}
    table[col]['unique'] = data_churn.select(col).distinct().count()
    table[col]['null'] = data_churn.filter(isnull(data_churn[col])).count()
    table[col]['empty'] = data_churn.filter(data_churn[col] == '').count()
    
    if table[col]['unique'] < 4:
        table[col]['unique_list'] = data_churn.select(col).distinct().toPandas()[col].unique()

In [131]:
import pandas as pd

pd_table = pd.DataFrame.from_dict(table).transpose()
pd_table

Unnamed: 0,empty,null,unique,unique_list
artist,0,58392,17656,
auth,0,0,4,
firstName,0,8346,190,
gender,0,8346,3,"[F, None, M]"
itemInSession,0,0,1322,
lastName,0,8346,174,
length,0,58392,14866,
level,0,0,2,"[free, paid]"
location,0,8346,115,
method,0,0,2,"[PUT, GET]"


#### Filt data (Null Values and Empty Valeus)

In [132]:
data_churn = data_churn.filter(data_churn['userId'] != '')

In [133]:
data_churn.count()

278154

#### Initial dataset analysis (after filter)

In [134]:
table2 = {}

for col in cols:
    table2[col] = {}
    table2[col]['unique'] = data_churn.select(col).distinct().count()
    table2[col]['null'] = data_churn.filter(isnull(data_churn[col])).count()
    table2[col]['empty'] = data_churn.filter(data_churn[col] == '').count()
    
    if table2[col]['unique'] < 4:
        table2[col]['unique_list'] = data_churn.select(col).distinct().toPandas()[col].unique()

In [135]:
import pandas as pd

pd_table2 = pd.DataFrame.from_dict(table2).transpose()
pd_table2

Unnamed: 0,empty,null,unique,unique_list
artist,0,50046,17656,
auth,0,0,2,"[Cancelled, Logged In]"
firstName,0,0,189,
gender,0,0,2,"[F, M]"
itemInSession,0,0,1311,
lastName,0,0,173,
length,0,50046,14866,
level,0,0,2,"[free, paid]"
location,0,0,114,
method,0,0,2,"[PUT, GET]"


#### Formating the timestamp

In [136]:
data_churn.select(data_churn.ts).show(2)

+-------------+
|           ts|
+-------------+
|1538352117000|
|1538352180000|
+-------------+
only showing top 2 rows



In [137]:
from pyspark.sql.functions import udf
import datetime

# Create a user defined function for formating the timestamp
get_time = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))

#Apply the udf on the ts column
data_churn = data_churn.withColumn("time", get_time(data_churn.ts))

In [138]:
data_churn.select(
    ['userId', 'ts', 'time', 'registration']
).take(5)

[Row(userId='30', ts=1538352117000, time='2018-10-01 00:01:57', registration=1538173362000),
 Row(userId='9', ts=1538352180000, time='2018-10-01 00:03:00', registration=1538331630000),
 Row(userId='30', ts=1538352394000, time='2018-10-01 00:06:34', registration=1538173362000),
 Row(userId='9', ts=1538352416000, time='2018-10-01 00:06:56', registration=1538331630000),
 Row(userId='30', ts=1538352676000, time='2018-10-01 00:11:16', registration=1538173362000)]

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

#### Gender

In [139]:
from pyspark.sql.functions import countDistinct

gender_count = data_churn.groupBy('gender').agg(countDistinct('userId'))
gender_count.show()

+------+----------------------+
|gender|count(DISTINCT userId)|
+------+----------------------+
|     F|                   104|
|     M|                   121|
+------+----------------------+



In [140]:
gender_count.toPandas()

Unnamed: 0,gender,count(DISTINCT userId)
0,F,104
1,M,121


In [141]:
import plotly.express as px

fig = px.pie(
    gender_count.toPandas(),
    values= 'count(DISTINCT userId)', names= 'gender', title= '% of users by gender')
fig.show()

#### Top 10 States

In [142]:
location_count = data_churn.groupBy('location').agg(countDistinct('userId')).toPandas()

location_count = location_count.join(location_count['location'].str.split(',',expand=True).rename(
    columns={0:'city',1:'state'})).drop('location',axis=1)

location_count.columns = ['count_userId', 'city', 'state']
state_count = location_count.groupby('state').count_userId.sum().sort_values(ascending= False).head(10)

In [143]:
import plotly.express as px

fig = px.pie(
    state_count.reset_index(),
    values= 'count_userId', names= 'state', title= 'Top 10 States with more users')
fig.show()

#### No of Users

In [144]:
data_churn.select('userId').distinct().count()

225

#### Users by level

In [145]:
level_count = data_churn.groupBy('level').agg(countDistinct('userId'))
level_count.show()

+-----+----------------------+
|level|count(DISTINCT userId)|
+-----+----------------------+
| free|                   195|
| paid|                   165|
+-----+----------------------+



In [146]:
import plotly.express as px

fig = px.pie(
    level_count.toPandas(),
    values= 'count(DISTINCT userId)', names= 'level', title= '% of users by level')
fig.show()

#### No of Artists

In [147]:
data_churn.select('artist').distinct().count()

17656

#### No of Songs

In [148]:
data_churn.select('song').distinct().count()

58481

#### Status

In [149]:
data_churn.select('status').distinct().show()

+------+
|status|
+------+
|   307|
|   404|
|   200|
+------+



#### Authentication

In [150]:
data_churn.groupBy('auth').agg(countDistinct('userId')).show(5)

+---------+----------------------+
|     auth|count(DISTINCT userId)|
+---------+----------------------+
|Cancelled|                    52|
|Logged In|                   225|
+---------+----------------------+



In [151]:
df_cleaned = data_churn

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

In [152]:
#find the flag event
df_cleaned.select('page').dropDuplicates().collect()

[Row(page='Cancel'),
 Row(page='Submit Downgrade'),
 Row(page='Thumbs Down'),
 Row(page='Home'),
 Row(page='Downgrade'),
 Row(page='Roll Advert'),
 Row(page='Logout'),
 Row(page='Save Settings'),
 Row(page='Cancellation Confirmation'),
 Row(page='About'),
 Row(page='Settings'),
 Row(page='Add to Playlist'),
 Row(page='Add Friend'),
 Row(page='NextSong'),
 Row(page='Thumbs Up'),
 Row(page='Help'),
 Row(page='Upgrade'),
 Row(page='Error'),
 Row(page='Submit Upgrade')]

In [153]:
from pyspark.sql.types import IntegerType

#define the flag event udf to transform event to 0 or 1
flag_event = udf(lambda x : 1 if x=='Cancellation Confirmation' else 0, IntegerType())

#define the current churn or not state
df_cleaned_cancel = df_cleaned.withColumn('Churn_state',flag_event('page'))

In [154]:
from pyspark.sql import Window
from pyspark.sql.functions import max as fmax

#mark the user who have churned event
userwindow = Window.partitionBy('userId').rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df_cleaned_cancel = df_cleaned_cancel.withColumn('Churn',fmax('Churn_state').over(userwindow))

In [155]:
df_cleaned_cancel.dropDuplicates(['userId']).select('Churn').groupby('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|   52|
|    0|  173|
+-----+-----+



In [156]:
gender_count_churn = df_cleaned_cancel.filter(df_cleaned_cancel['Churn'] == 0).groupBy('gender').agg(countDistinct('userId'))

In [157]:
import plotly.express as px

fig = px.pie(
    gender_count_churn.toPandas(),
    values= 'count(DISTINCT userId)', names= 'gender', title= '% of users by gender who canceled')
fig.show()

In [158]:
location_count_churn = df_cleaned_cancel.filter(df_cleaned_cancel['Churn'] == 0
                                        ).groupBy('location').agg(countDistinct('userId')).toPandas()

location_count_churn = location_count_churn.join(location_count_churn['location'].str.split(',',expand=True).rename(
    columns={0:'city',1:'state'})).drop('location',axis=1)

location_count_churn.columns = ['count_userId', 'city', 'state']
state_count_churn = location_count_churn.groupby('state').count_userId.sum().sort_values(ascending= False).head(10)

In [159]:
import plotly.express as px

fig = px.pie(
    state_count_churn.reset_index(),
    values= 'count_userId', names= 'state', title= 'Top 10 States with more users who canceled')
fig.show()

#### Browsers

In [160]:
def get_browser(x):
    if 'Firefox' in x:
        return 'Firefox'
    elif 'Safari' in x:
        if 'Chrome' in x:
            return 'Chrome'
        else:
            return 'Safari'
    elif 'Trident' in x:
        return 'IE'
    else:
        return np.NaN

In [161]:
browser = udf(lambda x : get_browser(x))

In [162]:
df_cleaned_cancel = df_cleaned_cancel.withColumn('browser',browser(df_cleaned_cancel.userAgent))

#### platform

In [163]:
import re

platform_dict = {'compatible': 'Windows',  'iPad': 'iPad',  'iPhone': 'iPhone',  
          'Macintosh': 'Mac',  'Windows NT 5.1': 'Windows','Windows NT 6.0': 'Windows', 'Windows NT 6.1': 'Windows',  
          'Windows NT 6.2': 'Windows',  'Windows NT 6.3': 'Windows', 'X11': 'Linux'}

get_platform = udf(lambda x: platform_dict[re.findall(r'\(([^\)]*)\)',x)[0].split(';')[0]])

df_cleaned_cancel = df_cleaned_cancel.withColumn('platform',get_platform(df_cleaned_cancel.userAgent))

#### Time Variables

In [164]:
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).hour)
df_cleaned_cancel = df_cleaned_cancel.withColumn('hour', get_hour(df_cleaned_cancel.ts))

get_day = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).day)
df_cleaned_cancel = df_cleaned_cancel.withColumn('day', get_day(df_cleaned_cancel.ts))

get_weekday = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime('%w'))
df_cleaned_cancel = df_cleaned_cancel.withColumn('dayofweek', get_weekday(df_cleaned_cancel.ts))

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

#### On the basis of the above EDA, we can create features as follows:

- Categorical Features (need label encoding)
    - gender
    - level
    - browser
    - platform
- Numerical Features
    - mean,max,min,std of length of users
    - numbers of these item in page (NextSong,ThumbsUp, ThumbsDown, AddtoPlaylist, AddFriend, RollAdvert)
    - number of unique songs and total songs of users
    - number of unique artists of users
    - percentage of operations after 15th in a month
    - percentage of operations in workday

#### Categorical Features

In [165]:
def label_encoding(col_name):
    '''
    transform categorical items to number
    '''
    temp = df_cleaned_cancel.select([col_name]).dropDuplicates().toPandas()
#     print(temp)
    label_dict = {val:str(idx) for idx,val in enumerate(temp[col_name].tolist())}
    result = df_cleaned_cancel.dropDuplicates(['userId']).select(['userId',col_name]).replace(label_dict,subset=col_name)
    return result

In [166]:
def get_categorical_features():
    '''
    join all categorical features together
    '''
    feature_gender = label_encoding('gender')
    feature_level = label_encoding('level')
    feature_browser = label_encoding('browser')
    feature_platform = label_encoding('platform')
    
    result = feature_gender.join(feature_level,on='userId',how='inner').\
                            join(feature_browser,on='userId',how='inner').\
                            join(feature_platform,on='userId',how='inner')
    return result

In [167]:
categorical_feature = get_categorical_features()

In [168]:
categorical_feature.show(2)

+------+------+-----+-------+--------+
|userId|gender|level|browser|platform|
+------+------+-----+-------+--------+
|100010|     0|    0|      1|       1|
|200002|     1|    0|      1|       1|
+------+------+-----+-------+--------+
only showing top 2 rows



#### Numerical Features

In [169]:
from pyspark.sql.functions import avg, stddev, first, col
from pyspark.sql.functions import min as fmin

def get_numerical_features():
    '''
    join all numerical features together and implement Standscaler
    '''
    #length
    feature_length = df_cleaned_cancel.select(["userId", "length"]).groupby(["userId"]).agg(avg('length').alias('mean_length'), 
                                                                     stddev('length').alias('stdev_length'),
                                                                    fmax('length').alias('max_length'),
                                                                    fmin('length').alias('min_length'))
    
    #page, reference url:https://stackoverflow.com/questions/56051438/pivot-table-in-pyspark
    page_count = df_cleaned_cancel.select(["userId","page"]).groupby(["userId","page"]).count()
    # create the pivot table
    temp1 = page_count.groupby('userId').pivot('page').agg(first('count')).fillna(0)
    # filter columns
    temp1 = temp1.select(['userId','NextSong','Thumbs Up', 'Thumbs Down', 'Add to Playlist', 'Add Friend', 'Roll Advert'])
    # column names used to sum up for total
    cols = temp1.columns[1:]
    # calculate the total 
    feature_page = temp1.withColumn('total', sum([col(c) for c in cols]))    
    
    
    #unique songs number
    feature_nunique_song = df_cleaned_cancel.filter(df_cleaned_cancel.page=='NextSong').select(["userId","song"]).\
                                             dropDuplicates(["userId","song"]).groupby(["userId"]).count()
    feature_nunique_song = feature_nunique_song.selectExpr("userId as userId","count as nunique_song")
    
    #total songs number
    feature_ntotal_song = df_cleaned_cancel.filter(df_cleaned_cancel.page=='NextSong').select(["userId","song"]).\
                                             groupby(["userId"]).count()
    #source:https://exceptionshub.com/how-to-change-dataframe-column-names-in-pyspark.html
    feature_ntotal_song = feature_ntotal_song.selectExpr("userId as userId","count as ntotal_song")
    
    #unique artists artist
    feature_nunique_artist = df_cleaned_cancel.filter(df_cleaned_cancel.page=='NextSong').select(["userId","artist"]).\
                                               dropDuplicates(["userId","artist"]).groupby(["userId"]).count()
    feature_nunique_artist = feature_nunique_artist.selectExpr("userId as userId","count as nunique_artist")
    
    #percentage of opretions
    day_count = df_cleaned_cancel.filter(df_cleaned_cancel.day>=15).select(['userId']).groupby(["userId"]).count()
    day_count = day_count.selectExpr("userId as userId","count as day_count")
    
    total_count = df_cleaned_cancel.select(['userId']).groupby(["userId"]).count()
    total_count = total_count.selectExpr("userId as userId","count as total_count")
    
    dayofweek_count = df_cleaned_cancel.filter(df_cleaned_cancel.dayofweek<5).select(['userId']).groupby(["userId"]).count()
    dayofweek_count = dayofweek_count.selectExpr("userId as userId","count as dayofweek_count")
    
    feature_percentage_month = (total_count.alias("total").join(day_count.alias("day"), ["userId"]).\
                              select(col("userId"), (col("day.day_count") / col("total.total_count")).alias("month_percentage")))
    
    feature_percentage_week = (total_count.alias("total").join(dayofweek_count.alias("day"), ["userId"]).\
                          select(col("userId"), (col("day.dayofweek_count") / col("total.total_count")).alias("week_percentage")))
    
    #merge together
    result = feature_length.join(feature_page,on='userId',how='inner').\
                        join(feature_nunique_song,on='userId',how='inner').\
                        join(feature_ntotal_song,on='userId',how='inner').\
                        join(feature_nunique_artist,on='userId',how='inner').\
                        join(feature_percentage_month,on='userId',how='inner').\
                        join(feature_percentage_week,on='userId',how='inner')
    
    return result

In [170]:
numerical_features = get_numerical_features()

In [171]:
numerical_features.show(2)

+------+------------------+----------------+----------+----------+--------+---------+-----------+---------------+----------+-----------+-----+------------+-----------+--------------+------------------+------------------+
|userId|       mean_length|    stdev_length|max_length|min_length|NextSong|Thumbs Up|Thumbs Down|Add to Playlist|Add Friend|Roll Advert|total|nunique_song|ntotal_song|nunique_artist|  month_percentage|   week_percentage|
+------+------------------+----------------+----------+----------+--------+---------+-----------+---------------+----------+-----------+-----+------------+-----------+--------------+------------------+------------------+
|100010|  243.421444909091|79.5156544698794| 538.85342|  52.27057|     275|       17|          5|              7|         4|         52|  360|         269|        275|           252|0.4120734908136483|0.5485564304461942|
|200002|242.91699209302305| 75.447490312729| 592.06485|  26.56608|     387|       21|          6|              8|   

#### Label

In [172]:
label = df_cleaned_cancel.select(['userId','Churn']).dropDuplicates()

#### Final data

In [173]:
from pyspark.sql.types import FloatType

def get_data_for_train():
    '''
    merge features and label together
    '''
    categorical_feature = get_categorical_features()
    numerical_feature = get_numerical_features()
    label = df_cleaned_cancel.select(['userId','Churn']).dropDuplicates()
    result = categorical_feature.join(numerical_feature,on='userId',how='inner').join(label,on='userId',how='inner')
    
    #correct datatype
    for col_name in result.columns[1:5]:
        result = result.withColumn(col_name, result[col_name].cast(IntegerType()))
    for col_name in result.columns[5:-1]:
        result = result.withColumn(col_name, result[col_name].cast(FloatType()))
    
    #fill NaN
    result = result.na.fill(0)
    
  
    return result

In [174]:
final_data = get_data_for_train()

In [175]:
final_data.columns

['userId',
 'gender',
 'level',
 'browser',
 'platform',
 'mean_length',
 'stdev_length',
 'max_length',
 'min_length',
 'NextSong',
 'Thumbs Up',
 'Thumbs Down',
 'Add to Playlist',
 'Add Friend',
 'Roll Advert',
 'total',
 'nunique_song',
 'ntotal_song',
 'nunique_artist',
 'month_percentage',
 'week_percentage',
 'Churn']

In [176]:
#backup final_data 
final_data.write.save('final_data_new.json', format= 'json', header= True, mode= "overwrite")

In [177]:
final_data = spark.read.json('final_data_new.json')

In [178]:
final_data_columns = [
    'Add Friend', 'Add to Playlist', 'NextSong', 'Roll Advert', 'Thumbs Down', 'Thumbs Up', 'browser',
    'gender', 'level', 'max_length', 'mean_length', 'min_length', 'month_percentage', 'ntotal_song',
    'nunique_artist', 'nunique_song', 'platform', 'stdev_length', 'total', 'week_percentage'
]

In [179]:
categorical_features = ['gender','level','browser','platform']

In [180]:
numerical_features = [
    'Add Friend', 'Add to Playlist', 'NextSong', 'Roll Advert', 'Thumbs Down', 'Thumbs Up',
    'max_length', 'mean_length', 'min_length', 'month_percentage', 'ntotal_song',
    'nunique_artist', 'nunique_song', 'stdev_length', 'total', 'week_percentage'
]

In [181]:
check_df = final_data.toPandas()

In [182]:
check_df.shape

(204, 22)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [183]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier,RandomForestClassifier, LinearSVC
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [184]:
#implement standscaler
vector = VectorAssembler(inputCols=numerical_features, outputCol='numerical_features')
temp = vector.transform(final_data)
scaler = StandardScaler(withMean=True, withStd=True, inputCol='numerical_features', outputCol='features_scaled')
scaler_fit = scaler.fit(temp)
result_scaled = scaler_fit.transform(temp)

#add categorical features to feature vector

vector = VectorAssembler(inputCols=categorical_features+['features_scaled'], outputCol='all_features')
result_scaled = vector.transform(result_scaled)
final_result = result_scaled.select(result_scaled.Churn.alias('label'), result_scaled.all_features.alias('features'))

In [185]:
from pyspark.sql.functions import rand

def undersample(df): 
    '''
    Implement undersample on dataset, return a balanced dataset.
    '''    
    # size of minority class(0)
    minoritySize = df.where(df.label == '1').count()
    
    # two classes with the same size
    df_minority = df.where(df.label == '1')
    df_majority = df.where(df.label == '0').sample(1.0, seed=7).limit(minoritySize)
    
    # concatenate them together
    result = df_minority.union(df_majority)
    
    #shuffle data
    result = result.orderBy(rand())
    return result

In [186]:
balanced_data = undersample(final_result)

In [187]:
check_balanced_df = balanced_data.toPandas()

In [188]:
#check out
balanced_data.groupby(balanced_data.label).count().show()

+-----+-----+
|label|count|
+-----+-----+
|    0|   38|
|    1|   38|
+-----+-----+



In [189]:
train, test = balanced_data.randomSplit([0.7, 0.3], seed=7)

In [190]:
test.count()

27

#### Base Model

In [191]:
# Initialize four models
clf_LR = LogisticRegression(maxIter=50)
clf_DT = DecisionTreeClassifier(seed=7)
clf_RF = RandomForestClassifier(seed=7)
clf_SVM = LinearSVC()

In [192]:
evaluator= MulticlassClassificationEvaluator(predictionCol="prediction")

In [194]:
import time
import pandas as pd

# collect results on the learners
all_results = {}
for clf in [clf_LR, clf_DT, clf_RF, clf_SVM]:
    model_results = {}
    # get the classifier name
    clf_name = clf.__class__.__name__
    
    
    # fit the dataset
    print(f'{clf_name} is training...')
    start = time.time() 
    model = clf.fit(train)
    end = time.time() 
    model_results['train_time'] = round(end-start,6)
    
    # predict
    print(f'{clf_name} is predicting...')
    start = time.time() 
    pred_test = model.transform(test)
    end = time.time()
    model_results['pred_time'] = round(end-start,6)
    
    #metrics
    print(f'{clf_name} is evaluating...')    
    model_results['accuracy_test'] = evaluator.evaluate(pred_test.select('label','prediction'),{evaluator.metricName: "accuracy"})
    model_results['f1_test'] = evaluator.evaluate(pred_test.select('label','prediction'),{evaluator.metricName: 'f1'})
    print('Test F1-score: ',model_results['f1_test'])
    all_results[clf_name] = model_results
    
all_results_df = pd.DataFrame(all_results)
all_results_df.to_csv('baseline.csv')

LogisticRegression is training...
LogisticRegression is predicting...
LogisticRegression is evaluating...
Test F1-score:  0.6085858585858586
DecisionTreeClassifier is training...
DecisionTreeClassifier is predicting...
DecisionTreeClassifier is evaluating...
Test F1-score:  0.5198467432950191
RandomForestClassifier is training...
RandomForestClassifier is predicting...
RandomForestClassifier is evaluating...
Test F1-score:  0.46851851851851856
LinearSVC is training...
LinearSVC is predicting...
LinearSVC is evaluating...
Test F1-score:  0.702075702075702


In [195]:
all_results_df

Unnamed: 0,LogisticRegression,DecisionTreeClassifier,RandomForestClassifier,LinearSVC
accuracy_test,0.62963,0.518519,0.481481,0.703704
f1_test,0.608586,0.519847,0.468519,0.702076
pred_time,0.069128,0.085323,0.097728,0.067985
train_time,7.769362,3.177573,2.835393,41.628851


#### Tuning

#### LinearSVC 

In [196]:
paramGrid = ParamGridBuilder().\
            addGrid(clf_SVM.maxIter, [10, 100, 1000]).\
            addGrid(clf_SVM.regParam, [0.01,0.1,10.0,100.0]).\
            build()
crossval = CrossValidator(estimator=clf_SVM,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                      numFolds=3)

In [197]:
start = time.time()
cvModel_SVM = crossval.fit(train)
end = time.time()
print(f'Model tuning is done, spent {end-start}s.')

Model tuning is done, spent 1008.4389734268188s.


In [198]:
cvModel_SVM.avgMetrics

[0.5412394622920939,
 0.5190584177890679,
 0.28302140693445044,
 0.28302140693445044,
 0.6287696365095745,
 0.6488380537400145,
 0.28302140693445044,
 0.28302140693445044,
 0.6308823529411764,
 0.6488380537400145,
 0.28302140693445044,
 0.28302140693445044]

In [209]:
pred = cvModel_SVM.transform(test)

print('LinearSVC')
print('Accuracy: {}'.format(evaluator.evaluate(pred.select('label','prediction'), {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(pred.select('label','prediction'), {evaluator.metricName: "f1"})))

LinearSVC
Accuracy: 0.7037037037037037
F-1 Score:0.7045177045177045


In [200]:
cvModel_SVM.write().overwrite().save('svm_model')

#### LogisticRegression 

In [201]:
paramGrid = ParamGridBuilder().\
            addGrid(clf_LR.elasticNetParam,[0.1, 0.5, 1]).\
            addGrid(clf_LR.regParam,[0.01, 0.05, 0.1]).\
            build()

crossval = CrossValidator(estimator=clf_LR,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(metricName="f1"),
                          numFolds=3)

In [202]:
start = time.time()
cvModel_LR = crossval.fit(train)
end = time.time()
print(f'Model tuning is done, spent {end-start}s.')

Model tuning is done, spent 136.24612164497375s.


In [203]:
cvModel_LR.avgMetrics

[0.5160577621361936,
 0.5639907850434166,
 0.512063492063492,
 0.5405675660577621,
 0.4528659611992945,
 0.44100354895549404,
 0.5405675660577621,
 0.43093267432792726,
 0.31227920227920225]

In [210]:
pred = cvModel_LR.transform(test)

print('LogisticRegression')
print('Accuracy: {}'.format(evaluator.evaluate(pred.select('label','prediction'), {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(pred.select('label','prediction'), {evaluator.metricName: "f1"})))

LogisticRegression
Accuracy: 0.7407407407407407
F-1 Score:0.7407407407407408


In [205]:
cvModel_LR.write().overwrite().save('lr_model')

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.