# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [54]:
# import libraries
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, MultilayerPerceptronClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd
import numpy as np
import re

In [55]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()
spark.sparkContext.getConf().getAll()

# SparkContext
configure = SparkConf().setAppName(value = "Sparkify").setMaster('local')
sc = SparkContext.getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [56]:
path = "mini_sparkify_event_data.json"
sparkify_log = spark.read.json(path)

In [57]:
sparkify_log.head(n=2)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9')]

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [58]:
df = sparkify_log.toPandas()

In [59]:
df.loc[df['sessionId'] == 8]

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
1,Five Iron Frenzy,Logged In,Micah,M,79,Long,236.09424,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Canada,200,1538352180000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
3,Enigma,Logged In,Micah,M,80,Long,262.71302,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Knocking On Forbidden Doors,200,1538352416000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
5,The All-American Rejects,Logged In,Micah,M,81,Long,208.29995,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Don't Leave Me,200,1538352678000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
6,The Velvet Underground / Nico,Logged In,Micah,M,82,Long,260.46649,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Run Run Run,200,1538352886000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
10,Britt Nicole,Logged In,Micah,M,83,Long,229.8771,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Walk On The Water,200,1538353146000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
11,,Logged In,Micah,M,84,Long,,free,"Boston-Cambridge-Newton, MA-NH",GET,Roll Advert,1538332000000.0,8,,200,1538353150000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
13,Tesla,Logged In,Micah,M,85,Long,201.06404,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Gettin' Better,200,1538353375000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
14,,Logged In,Micah,M,86,Long,,free,"Boston-Cambridge-Newton, MA-NH",PUT,Thumbs Up,1538332000000.0,8,,307,1538353376000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
16,Florence + The Machine,Logged In,Micah,M,87,Long,168.64608,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,You've Got The Love,200,1538353576000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0
19,Ratatat,Logged In,Micah,M,88,Long,229.77261,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1538332000000.0,8,Swisha,200,1538353744000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9.0


In [60]:
df['userAgent'].unique()

array([ 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0',
       '"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"',
       '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"',
       'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:32.0) Gecko/20100101 Firefox/32.0',
       None,
       'Mozilla/5.0 (Windows NT 6.0; rv:31.0) Gecko/20100101 Firefox/31.0',
       'Mozilla/5.0 (Windows NT 6.2; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0',
       '"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"',
       'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:31.0) Gecko/20100101 Firefox/31.0',
       '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"',
       '"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS

In [61]:
df['page'].unique()

array(['NextSong', 'Add to Playlist', 'Roll Advert', 'Thumbs Up',
       'Downgrade', 'Thumbs Down', 'Home', 'Logout', 'Help', 'Login',
       'Upgrade', 'Add Friend', 'About', 'Settings', 'Submit Upgrade',
       'Submit Downgrade', 'Error', 'Save Settings', 'Cancel',
       'Cancellation Confirmation', 'Register', 'Submit Registration'], dtype=object)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

## Define Churn
"Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events."

In [62]:
# As label, a new columnd 'downgrade' is added. Both events 'Cancellation Confirmation' and 'Downgrade' are analysed
churn = (df['page'].isin(['Submit Downgrade', 'Cancellation Confirmation'])).astype(int)
df = df.assign(downgrade = churn) # adding new column

The data can be viewed in two different ways. On the one hand, the data can be viewed based on the different interactions of a user, i.e. his customer journey, which leads to a downgrade. This data is viewed in the *session_df* data frame.

On the other hand, central facts of a user can also be viewed independently of his individual sessions. These are collected in the data frame *user_df*.

## Session_df & User_df
Some preprocessiong steps are identical for both data frames, so these steps are performed once on the original data frame *df*.

In [63]:
# Convert level to binary column
df['level'] = (df['level'] == 'paid').astype(int)

In [64]:
# If a user logs out, the userId is empty/''. This information is not needed in the further calculations
df.userId = df.userId.replace('', np.nan)
print(df.shape)
df = df.dropna(subset = ['userId'])
print(df.shape)

(286500, 19)
(278154, 19)


In [65]:
# Modify the *user_id* to split the user journey into several parts. 
# As long as a user does not perform some kind of downgrade, the *user_id* does not change.
# One user_id can contain as last step an downgrade 
for userId, item in df.groupby(by=['userId']):
    #print(userId)
    if len(item.level.unique()) == 1:
        # User does not perform an update or downgrade -> no need to adapt the userId
        continue
    else:
        # Magic found on https://stackoverflow.com/questions/31132659/pandas-get-elements-until-value-changes
        item.loc[:, "e3"] = item["level"].shift(1)
        item.loc[:, "e4"] = item["level"] != item.loc[:,"e3"]
        item.loc[:, "e5"] = item["e4"].cumsum()
        
        item.loc[:, "e5"] = item["e5"]-1
        df.loc[item.index, 'userId'] = userId+'_'+item["e5"].apply(str)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self.obj[key] = _infer_fill_value(value)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self.obj[item] = s


In [66]:
#print('df.head():\n',df.head())
#print()
#print('df.tail():\n',df.tail())

In [67]:
#df[df['downgrade']==1][['userId','page']]
#df['page'].unique()
#df[df['userId'] == '38_0']

## Session_df

In [68]:
# TODO itemInSession
session_df = df[['userId', 'sessionId', 'level', 'itemInSession', 'location', 'status', 'ts', 'downgrade']]
session_df.head()

Unnamed: 0,userId,sessionId,level,itemInSession,location,status,ts,downgrade
0,30_0,29,1,50,"Bakersfield, CA",200,1538352117000,0
1,9_0,8,0,79,"Boston-Cambridge-Newton, MA-NH",200,1538352180000,0
2,30_0,29,1,51,"Bakersfield, CA",200,1538352394000,0
3,9_0,8,0,80,"Boston-Cambridge-Newton, MA-NH",200,1538352416000,0
4,30_0,29,1,52,"Bakersfield, CA",200,1538352676000,0


### 1) Check if location changes during the session 
For each *user_id* and each session, the location is compared. As no user has several locations in one session, no one travels. So the column is removed from the data frame as it does not contain valuable information.

In [69]:
session_df = session_df[pd.notnull(session_df['location'])] 
# removing None values (connection issues as they are irrelevant for the question if a person is traveling)
#print(session_df.groupby(by = ['sessionId'])['location'].unique())

for item in session_df.groupby(by = ['userId', 'sessionId'])['location'].unique().sort_values():
    # Location does not change during one session
    assert len(item) == 1
    
session_df = session_df.drop('location',axis = 1)

### 2) Calculate length of current session
Assumption: The longer the duration of a session, the more a user interacts with the service, this means that he is probably satisfied with the service and will not downgrade.

In [70]:
# Append new column
session_df = session_df.assign(currSessionLength = pd.Series(np.zeros(session_df.shape[0], dtype=int)))
session_df = session_df.reset_index(drop = True)

for sessionId, elem in session_df.groupby(by = ['userId', 'sessionId'])['ts']:
    
    firstTimestamp = min(elem)
    lastTimestamp = max(elem)
    session_df.iloc[elem.index, -1] = lastTimestamp - firstTimestamp
    # appends data to the last column (the newly added one)


### 3) Convert state into binary *error* column

In [71]:
session_df = session_df.assign(error = pd.Series(np.zeros(session_df.shape[0], dtype=int))) # adding new, empty column
session_df['error'] = (session_df['status'] == 404).astype(int)
session_df = session_df.drop(['status'], axis = 1)

In [72]:
print('Final columns of session_df: ', session_df.columns)

Final columns of session_df:  Index(['userId', 'sessionId', 'level', 'itemInSession', 'ts', 'downgrade',
       'currSessionLength', 'error'],
      dtype='object')


## User_df
To calculate user specific, time independent values, some information from the original data frame *df* are needed. These required values are buffered in *user_df_tmp*. The comprimised values are stored in *user_df*.

In [89]:
# TODO itemInSession
user_df_tmp = df[['userId', 'sessionId', 'level', 'ts', 'page', 'registration', 'userAgent', 'downgrade']]
columns = ['userId', 'level', 'nb_devices', 'avgSessionLength', 'lastInteraction', 'freqOfUse', 'downgrade']
user_df = pd.DataFrame(np.zeros((user_df_tmp.userId.unique().shape[0],len(columns)), dtype=int), columns = columns)

# Set UserId
user_df.userId = user_df_tmp.userId.unique()

# Set Downgrade
for userId, downgrade in user_df_tmp.groupby(by = 'userId')['downgrade']:
    user_df.loc[userId == user_df.userId, 'downgrade'] = downgrade.max()

# Set Level
for userId, level in user_df_tmp.groupby(by = 'userId')['level']:
    assert len(level.unique()) == 1
    user_df.loc[userId == user_df.userId, 'level'] = level.unique()[0]
    
# Set lastInteraction
for userId, timestamp in user_df_tmp.groupby(by = ['userId'])['ts']:
    user_df.loc[userId == user_df.userId, 'lastInteraction'] = max(timestamp)
    
print(user_df.head())

  userId  level  nb_devices  avgSessionLength  lastInteraction  freqOfUse  \
0   30_0      1           0                 0    1538995454000          0   
1    9_0      0           0                 0    1538839066000          0   
2   74_0      0           0                 0    1539939591000          0   
3   54_0      1           0                 0    1539608060000          0   
4    4_0      0           0                 0    1540121320000          0   

   downgrade  
0          1  
1          0  
2          0  
3          1  
4          0  


### 1) nb_devices
Guess: The more devices a user uses, the better.

-> As every user only uses one device, the column is deleted

In [90]:
# Set nb_devices
for userId, userAgent in user_df_tmp.groupby(by = 'userId')['userAgent']:
    userAgent = [item[1].split(')')[0] for item in userAgent.str.split('(')]
    
    tmp = set(userAgent) #The number of different devices is interesting, not how often a device was used
    
    if 'compatible' in tmp and 'Windows' in tmp:
        print('Juhu')
        user_df.loc[userId == user_df.userId, 'nb_devices'] = len(set(userAgent))-1
    else:
        user_df.loc[userId == user_df.userId, 'nb_devices'] = len(set(userAgent))
    
assert user_df.nb_devices.unique() == 1
user_df = user_df.drop('nb_devices', axis =1)

### 2) avg_session_length
Guess: The longer a user's sessions are, the less likely it is that the user will downgrade the service.

In [91]:
for userId, elem in user_df_tmp.groupby(by = ['userId']):
    sessionLength = {}
    for sessionId, timestamp in elem.groupby(by=['sessionId'])['ts']:
        sessionLength[sessionId] = max(timestamp) - min(timestamp)

    user_df.loc[userId == user_df.userId, 'avgSessionLength'] = pd.Series(sessionLength).mean()
    

print(user_df.head())

  userId  level  avgSessionLength  lastInteraction  freqOfUse  downgrade
0   30_0      1      1.889567e+07    1538995454000          0          1
1    9_0      0      6.605500e+06    1538839066000          0          0
2   74_0      0      8.611714e+06    1539939591000          0          0
3   54_0      1      2.944917e+07    1539608060000          0          1
4    4_0      0      2.950778e+06    1540121320000          0          0


### 3) frequency of use

In [92]:
# How many days are part of the dataset?
nb_days_in_data = (max(user_df_tmp.ts)-min(user_df_tmp.ts))/86400000
print('The data set contains information about {} days.'.format(nb_days_in_data))

for userId, timestamps in user_df_tmp.groupby(by = ['userId'])['ts']:
    #print(userId)
    #print(timestamps[0:5])
    first_contact = min(timestamps)
    timestamps = (((timestamps - first_contact)/1000)/86400).astype(int)
    # /1000  - every timestamp has 3 zeros at the end, which do not contain information
    #        - da jeder Zeitstempel hinten drei Nullen hat die nichts bedeuten
    # /86400 - Seconds of a day / Sekunden eines Tages
    # len(timestamps.unique()) - at how many days did the user use the service?
    #                          - an wie vielen Tagen war der Nutzer online: 
    # nb_days_in_data - which time period is considered by the data ? 
    #                 - welcher Zeitraum wird betrachtet?
    user_df.loc[userId == user_df.userId, 'freqOfUse'] = len(timestamps.unique())/nb_days_in_data

The data set contains information about 63.04813657407407 days.


In [94]:
print('Final peek of user_df: \n', user_df.head())

Final peek of user_df: 
   userId  level  avgSessionLength  lastInteraction  freqOfUse  downgrade
0   30_0      1      1.889567e+07    1538995454000   0.063444          1
1    9_0      0      6.605500e+06    1538839066000   0.063444          0
2   74_0      0      8.611714e+06    1539939591000   0.095165          0
3   54_0      1      2.944917e+07    1539608060000   0.174470          1
4    4_0      0      2.950778e+06    1540121320000   0.142748          0


# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [95]:
# Convert pandas dataframe to spark dataframe
sqlContext = SQLContext(sc)
#spark_session_df = sqlContext.createDataFrame(session_df)
spark_user_df = sqlContext.createDataFrame(user_df)

## Modeling for SessionDF
As none of the available ML Algorithms of PySpark is able to perform TimeSeries-Based predictions, no machine learning or predictions have been perforrmed on this data frame.

## Modeling for UserDF
To make the three different ML Algorithms as comparable as possible, the randomSplit, the feature_assembler and the label_indexer are defined one time at the start. All Parameters for the mentioned parts are directly defined, they are not modified in any of the ParamGrids.

For the Evaluation of the models, the f1-score is used.

In [96]:
# Split the full dataset into train, test, and validation sets. 
#      - The split between train and test is done inside the pipeline
rest_user, val_user = spark_user_df.randomSplit([0.8, 0.2], seed=42)
print('For Validation {} % of the data are used. This are {} data entries.'.format(20, val_user.count()))

# As input for the ML-algorithms a single vector - "features" is needed
feature_assembler = VectorAssembler(inputCols= ['level', 'avgSessionLength', 'lastInteraction', 'freqOfUse'], outputCol="features")
label_indexer = StringIndexer(inputCol = 'downgrade', outputCol = 'label')

# Parameters used for all 3 models
used_evaluator = MulticlassClassificationEvaluator(metricName = 'f1') # as propossed, the f1-score is used 
numFolds = 10
parallelism = 5


For Validation 20 % of the data are used. This are 78 data entries.


### Testing of several Machine Learning methods
Test out several of the machine learning methods you learned. 
- Define a parameter grid to test possible configuration
- Evaluate the accuracy of the various models, tuning parameters as necessary.

#### a) Logistic Regression

In [97]:
# Model and pipeline definition
lr =  LogisticRegression(family = 'binomial')
lr_pipeline = Pipeline(stages = [ feature_assembler, label_indexer, lr])

lr_paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 100, 200]) \
    .addGrid(lr.regParam,[0.0, 0.1, 0.5]) \
    .addGrid(lr.threshold, [0.1, 0.5, 0.95, 1.0])\
    .build()

# used for quick testing if the implementation is correct
#lr_paramGrid = ParamGridBuilder() \
#    .addGrid(lr.maxIter, [10]) \
#    .addGrid(lr.regParam,[0.1]) \
#    .addGrid(lr.threshold, [0.95, 0.1])\
#    .build()


lr_crossval = CrossValidator(estimator=lr_pipeline,
                             estimatorParamMaps=lr_paramGrid,
                             evaluator= used_evaluator,
                             numFolds= numFolds,
                             parallelism = parallelism)

In [98]:
# Train the pipeline
lr_model_user = lr_crossval.fit(rest_user)

In [99]:
# Extract the parameters of the best model
print('The {} different LogisticRegressionModels have performed the following F1-Scores: {}'
      .format(len(lr_model_user.avgMetrics), lr_model_user.avgMetrics))

# https://forums.databricks.com/questions/12097/pyspark-how-to-get-best-params-in-grid-search.html
lr_bestPipeline = lr_model_user.bestModel
lr_bestModel = lr_bestPipeline.stages[-1]
lr_bestParams = lr_bestModel.extractParamMap()

maxIter = -1
regParam = -1
threshold = -1

for key, value in lr_bestParams.items():
    if 'maxIter' in str(key):
        maxIter = value
    elif 'regParam' in str(key):
        regParam = value
    elif 'threshold' in str(key):
        threshold = value


print('The best LogisticRegressionModel used the following parameters\n'+
      ' - maxIter: {}\n'
      ' - regParam: {}\n'
      ' - threshold: {}\n'
      'The other parameters have not been modified by the ParamGrid.'.format(maxIter,regParam,threshold))

print('The best LinearRegressionModel has a F1-Test-Score of {} and a F1-Validation-Score of {}.'
      .format(max(lr_model_user.avgMetrics), used_evaluator.evaluate(lr_model_user.bestModel.transform(val_user))))

The 36 different LogisticRegressionModels have performed the following F1-Scores: [0.09751015954775846, 0.6498861190010575, 0.6513642487144814, 0.6513642487144814, 0.09751015954775846, 0.655718542155996, 0.6513642487144814, 0.6513642487144814, 0.09751015954775846, 0.6513642487144814, 0.6513642487144814, 0.6513642487144814, 0.5194885335280396, 0.8310818702900145, 0.6594440915769328, 0.6513642487144814, 0.28971935783143116, 0.7399613886923692, 0.6513642487144814, 0.6513642487144814, 0.09751015954775846, 0.6513642487144814, 0.6513642487144814, 0.6513642487144814, 0.5517812924706815, 0.8662525968146704, 0.6653345442415791, 0.6513642487144814, 0.28546145092646547, 0.7399613886923692, 0.6513642487144814, 0.6513642487144814, 0.09751015954775846, 0.6513642487144814, 0.6513642487144814, 0.6513642487144814]
The best LogisticRegressionModel used the following parameters
 - maxIter: 200
 - regParam: 0.0
 - threshold: 0.5
The other parameters have not been modified by the ParamGrid.
The best Linear

#### b) Random Forest

In [100]:
# Model and pipeline definition
rf = RandomForestClassifier()
rf_pipeline = Pipeline(stages = [feature_assembler, label_indexer, rf])

rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2, 5, 10, 30]) \
    .addGrid(rf.numTrees,[2, 5, 10, 50]) \
    .addGrid(rf.maxBins, [2, 5, 10, 50])\
    .build()

# used for quick testing if the implementation is correct
#rf_paramGrid = ParamGridBuilder() \
#    .addGrid(rf.maxDepth, [2]) \
#    .addGrid(rf.numTrees,[5]) \
#    .addGrid(rf.maxBins, [10])\
#    .build()


rf_crossval = CrossValidator(estimator=rf_pipeline,
                             estimatorParamMaps=rf_paramGrid,
                             evaluator= used_evaluator,
                             numFolds= numFolds,
                             parallelism = parallelism)

In [101]:
# Train the pipeline
rf_model_user = rf_crossval.fit(rest_user)

In [102]:
# Extract the parameters of the best model
print('The {} different RandomForestModels have performed the following F1-Scores: {}'
      .format(len(rf_model_user.avgMetrics), rf_model_user.avgMetrics))

# https://forums.databricks.com/questions/12097/pyspark-how-to-get-best-params-in-grid-search.html
rf_bestPipeline = rf_model_user.bestModel
rf_bestModel = rf_bestPipeline.stages[-1]
rf_bestParams = rf_bestModel.extractParamMap()

maxDepth = -1
numTrees = -1
maxBins = -1

#print(rf_bestParams)

for key, value in rf_bestParams.items():
    if 'maxDepth' in str(key):
        maxDepth = value
    elif 'numTrees' in str(key):
        numTrees = value
    elif 'maxBins' in str(key):
        maxBins = value

print('The best RandomForestModel used the following parameters\n'+
      ' - maxDepth: {}\n'
      ' - numTrees: {}\n'
      ' - maxBins: {}\n'
      'All other parameters have not been modified by the ParamGrid.'.format(maxDepth,numTrees,maxBins))

print('The best RandomForestModel has a F1-Test-Score of {} and a F1-Validation-Score of {}.'
      .format(max(rf_model_user.avgMetrics), used_evaluator.evaluate(rf_model_user.bestModel.transform(val_user))))

The 64 different RandomForestModels have performed the following F1-Scores: [0.8333191102314182, 0.8845271267434718, 0.8679282271459141, 0.8849698360703269, 0.7230946188202829, 0.7337658985029604, 0.719429872721099, 0.7485916673946131, 0.8446079709494563, 0.8408957294260888, 0.8383629388335967, 0.8597173471003176, 0.8774091604042005, 0.886246607812798, 0.8842173873641396, 0.8806761375283226, 0.8700596819258921, 0.8812530598537055, 0.849353987697975, 0.881113504181326, 0.8751675087721961, 0.8867573178600371, 0.8767434083304155, 0.8879826522110085, 0.8774091604042005, 0.8805003932430541, 0.8743519337566839, 0.8910334807854925, 0.8774091604042005, 0.8854651845493088, 0.8863406029306845, 0.8785390560267556, 0.8700596819258921, 0.8739013642365522, 0.8204846822162835, 0.8425822799440499, 0.8751675087721961, 0.8804603201238784, 0.8458489833721055, 0.8698285821110343, 0.8774091604042005, 0.8829537420226011, 0.8765026702389823, 0.8812726761156863, 0.8774091604042005, 0.8777462024295947, 0.88438

#### c) Gradient-boosted tree classifier

In [103]:
# Model and pipeline definition
gbt = GBTClassifier()
gbt_pipeline = Pipeline(stages = [feature_assembler, label_indexer, gbt])

gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2, 5, 10, 30]) \
    .addGrid(gbt.stepSize, [0.03, 0.1, 0.5, 0.9]) \
    .addGrid(gbt.maxBins, [2, 5, 10])\
    .build()

# used for quick testing if the implementation is correct
#gbt_paramGrid = ParamGridBuilder() \
#    .addGrid(gbt.maxDepth, [5]) \
#    .addGrid(gbt.stepSize, [0.1]) \
#    .addGrid(gbt.maxBins, [5])\
#    .build()


gbt_crossval = CrossValidator(estimator=gbt_pipeline,
                             estimatorParamMaps=gbt_paramGrid,
                             evaluator= used_evaluator,
                             numFolds= numFolds,
                             parallelism = parallelism)

In [104]:
# Train the pipeline
gbt_model_user = gbt_crossval.fit(rest_user)

In [105]:
# Extract the parameters of the best model
print('The {} different GradientBoostedTreeClassifierModels have performed the following F1-Scores: {}'
      .format(len(gbt_model_user.avgMetrics), gbt_model_user.avgMetrics))

# https://forums.databricks.com/questions/12097/pyspark-how-to-get-best-params-in-grid-search.html
gbt_bestPipeline = gbt_model_user.bestModel
gbt_bestModel = gbt_bestPipeline.stages[-1]
gbt_bestParams = gbt_bestModel.extractParamMap()

maxDepth = -1
stepSize = -1
maxBins = -1

#print(gbt_bestParams)

for key, value in gbt_bestParams.items():
    if 'maxDepth' in str(key):
        maxDepth = value
    elif 'stepSize' in str(key):
        stepSize = value
    elif 'maxBins' in str(key):
        maxBins = value

print('The best boostedTreeClassifierModels used the following parameters\n'+
      ' - maxDepth: {}\n'
      ' - stepSize: {}\n'
      ' - maxBins: {}\n'
      'All other parameters have not been modified by the ParamGrid.'.format(maxDepth,stepSize,maxBins))

print('The best GradientBoostedTreeClassifierModels has a F1-Test-Score of {} and a F1-Validation-Score of {}.'
      .format(max(gbt_model_user.avgMetrics), used_evaluator.evaluate(gbt_model_user.bestModel.transform(val_user))))

The 64 different GradientBoostedTreeClassifierModels have performed the following F1-Scores: [0.8774091604042005, 0.8899509426936035, 0.8899509426936035, 0.8800253164940026, 0.8774091604042005, 0.8899509426936035, 0.8899509426936035, 0.8800253164940026, 0.8774091604042005, 0.8879185333288558, 0.8842256862317769, 0.874846577573598, 0.8737653245652427, 0.8873062884308968, 0.8818351117845334, 0.8819966419546567, 0.8774091604042005, 0.8662548289232026, 0.8789349157426888, 0.8634178657501967, 0.8774091604042005, 0.8573311019985164, 0.859235377540431, 0.8477845529095598, 0.8774091604042005, 0.8530738947556241, 0.8477470264618924, 0.8295172426565157, 0.8774091604042005, 0.8406124619604741, 0.8315977270684258, 0.8241379423728843, 0.8774091604042005, 0.8455354987097108, 0.8399994774323338, 0.792367875143178, 0.8774091604042005, 0.8525399431030222, 0.8373940271194795, 0.7895983539759701, 0.8774091604042005, 0.8611337220935272, 0.8379003620287254, 0.7966698231587233, 0.8774091604042005, 0.8525793

## Decision for one implementation
Determine your winning model based on test accuracy and report results on the validation set. The values for the F1-Score are rounded for 4 decimal places.

| nb | Algorithm | Parameters | F1-Train | F1-Test |
|----|---|---|---|---|
| a) | Logistic Regression | maxIter: 10, regParam: 0.0, threshold: 0.5 | 0.7657 | 0.6471 |
| b) | Random Forest | maxDepth: 5, numTrees: 10, maxBins: 10 | 0.7492 | 0.6748 |
| c) | Gradient-boosted tree classifier | maxDepth: 2, stepSize: 0.03, maxBins: 5 | 0.8900  | 0.9210 |

As the table shows, an unambiguous result has been achieved. To determine if a user wants to downgrade or leave the service a Gradient-boosted tree classifier should be used.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.