In [0]:
## Convert into Spark DataFrame
spotify_spdf = spark.read.table('spotify')


### SelectKBest Feature Selection
Uncomment the following code to make use of selectkbest feature selection. This will reuduce the number of variables in our dataset and make fitting models quicker (depending on the value of k chosen)

In [0]:
import pandas as pd

spotify_df = spotify_spdf.toPandas()

In [0]:
from sklearn.preprocessing import LabelEncoder
categorical_cols = ['session_id','track_id','context_type', 'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end',
'previous_track_id', 'previous_hist_user_behavior_reason_start', 'previous_hist_user_behavior_reason_end']
# instantiate labelencoder object
le = LabelEncoder()

# apply le on categorical feature columns
spotify_df[categorical_cols] = spotify_df[categorical_cols].apply(lambda col: le.fit_transform(col))

In [0]:
import numpy as np
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_classif
from sklearn.preprocessing import LabelEncoder



# Assign the features and target variable
X = spotify_df.drop('skipped', axis=1)
y = spotify_df['skipped']

# remove rows with NaN or infinity values
# drop rows that contain NaN or infinity values from both X and y
X = X.replace([np.inf, -np.inf], np.nan)
Xy = X.join(y)
Xy = Xy.dropna()

X = Xy.drop('skipped', axis=1)
y = Xy['skipped']


# Create a label encoder
le = LabelEncoder()

# fit and transform the label encoder on the categorical columns
# fit and transform the label encoder on the categorical columns
# fit and transform the label encoder on the categorical columns
for col in X.columns:
    if X[col].dtype == 'object':
        try:
            float(X[col][0])
            X[col] = le.fit_transform(X[col])
        except ValueError:
            X = X.drop(col, axis=1)



# Select the top k features using the f_classif score function
selector = SelectKBest(f_classif, k=20)
X_new = selector.fit_transform(X, y)

# Get the column names of the selected features
selected_features = X.columns[selector.get_support()]

# Create a new dataframe with only the selected features
spotify_df_reduced = spotify_df.loc[:, selected_features]
spotify_df_reduced = spotify_df_reduced.assign(skipped=spotify_df['skipped'])

In [0]:
original_columns = set(spotify_spdf.columns)
selected_columns = set(selected_features)
removed_columns = original_columns.difference(selected_columns)
print(removed_columns)

{'organism', 'previous_speechiness', 'previous_acoustic_vector_5', 'previous_release_year', 'previous_mechanism', 'previous_hist_user_behavior_n_seekback', 'previous_short_pause_before_play', 'previous_tempo', 'previous_mode', 'previous_loudness', 'release_year', 'acoustic_vector_4', 'loudness', 'previous_long_pause_before_play', 'acoustic_vector_1', 'acoustic_vector_7', 'previous_valence', 'energy', 'duration', 'hist_user_behavior_n_seekfwd', 'previous_energy', 'valence', 'us_popularity_estimate', 'premium', 'track_id', 'previous_us_popularity_estimate', 'previous_acoustic_vector_2', 'acoustic_vector_2', 'key', 'previous_acoustic_vector_7', 'previous_hist_user_behavior_n_seekfwd', 'hour_of_day', 'previous_flatness', 'previous_key', 'previous_time_signature', 'time_signature', 'previous_instrumentalness', 'previous_organism', 'previous_acoustic_vector_1', 'acoustic_vector_5', 'previous_beat_strength', 'tempo', 'acousticness', 'liveness', 'previous_liveness', 'flatness', 'bounciness', '

In [0]:
spotify_spdf_reduced = spark.createDataFrame(spotify_df_reduced)
display(spotify_spdf_reduced.dtypes)

_1,_2
session_position,int
session_length,int
context_switch,int
no_pause_before_play,int
hist_user_behavior_n_seekback,int
hist_user_behavior_is_shuffle,boolean
context_type,bigint
hist_user_behavior_reason_start,bigint
hist_user_behavior_reason_end,bigint
speechiness,float


In [0]:
from __future__ import division

# import necessary libs
import numpy  as np
import pandas as pd

# general spark modules
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import lit

# spark ml modules 
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline


# classification 
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA


from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time
import itertools
import mlflow
import mlflow.pyfunc
import mlflow.sklearn
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time

we can split our dataset into train and test data. Uncomment the second line if using selectkbest

In [0]:
#(train_spdf, test_spdf) = spotify_spdf.randomSplit([0.7, 0.3])
(train_spdf, test_spdf) = spotify_spdf_reduced.randomSplit([0.7, 0.3])
print(train_spdf.count()) 
print(test_spdf.count())

117413
50467


saving the test data to a parquet file so we can utilise it later in our final model notebook

In [0]:
test_spdf.write.format("parquet").mode("overwrite").save("dbfs:/dbfs/test_spdf")


In [0]:
# prepare data for models

# Write a custom function to convert the data type of DataFrame columns# Write 
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

we store the categorical columns in a list so we can seperate them from the numeric then convert the numeric to all have datatype float~

In [0]:
# cast numerical columns to float

categorical_cols = ['session_id','track_id','context_type', 'hist_user_behavior_reason_start', 'hist_user_behavior_reason_end',
'previous_track_id', 'previous_hist_user_behavior_reason_start', 'previous_hist_user_behavior_reason_end']
#numerical_cols = (spotify_spdf.select([c for c in spotify_spdf.columns if c not in categorical_cols])).columns
numerical_cols = (spotify_spdf.select([c for c in spotify_spdf.columns if c not in categorical_cols and c != 'skipped'])).columns
label_col = ['skipped']
train_spdf  = convertColumn(train_spdf,  numerical_cols + label_col, FloatType())
test_spdf   = convertColumn(test_spdf,   numerical_cols , FloatType())

# drop missing values if any
train_spdf  = train_spdf.dropna()
test_spdf   = test_spdf.dropna()
print(train_spdf.count()) 
print(test_spdf.count())

117695
50184


In [0]:
# # cast numerical columns to float

# categorical_cols = []
# #numerical_cols = (spotify_spdf.select([c for c in spotify_spdf.columns if c not in categorical_cols])).columns
# numerical_cols = (spotify_spdf_reduced.select([c for c in spotify_spdf_reduced.columns if c not in categorical_cols and c != 'skipped'])).columns
# label_col = ['skipped']
# train_spdf  = convertColumn(train_spdf,  numerical_cols + label_col, FloatType())
# test_spdf   = convertColumn(test_spdf,   numerical_cols , FloatType())

# # drop missing values if any
# train_spdf  = train_spdf.dropna()
# test_spdf   = test_spdf.dropna()
# print(train_spdf.count()) 
# print(test_spdf.count())

117479
50401


Here we are creating the string indexers for our categorical data types and adding them to the stages for our pipeline. We also create a new features column using the vector assembler

In [0]:
stages = []

for categorical_col in categorical_cols:
    string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_index", handleInvalid='keep')
    stages += [string_indexer]

assembler_inputs = numerical_cols + [c + "_index" for c in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

stages  += [assembler]

In [0]:
display(train_spdf.dtypes)

_1,_2
session_position,float
session_length,float
context_switch,float
no_pause_before_play,float
hist_user_behavior_n_seekback,float
hist_user_behavior_is_shuffle,float
context_type,float
hist_user_behavior_reason_start,float
hist_user_behavior_reason_end,float
speechiness,float


# Model Selection

###Logistic Regression

We will use a pipeline to create and fit our model, if we want we can add a PCA or Selector stage to the stages of our model as below

In [0]:
pca = PCA(k=30, inputCol="features", outputCol="pcaFeatures")
selector = ChiSqSelector(numTopFeatures=50, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="skipped")

In [0]:
lr = LogisticRegression(labelCol="skipped", 
                        featuresCol="pcaFeatures")

Using a parameter grid, we can use cross validation as the final step in our pipeline to test multiple different combinations of parameters for each model to chose the best possible version

In [0]:
# Define a grid of hyperparameters to test:
#  - maxDepth: maximum depth of each decision tree 
#  - maxIter: iterations, or the total number of trees 
paramGrid = ParamGridBuilder()\
  .addGrid(lr.regParam, [0.1, 0.5, 2.0])\
  .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
  .addGrid(lr.maxIter, [10, 50, 100])\
  .build()
 
# Define an evaluation metric.  The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
bcevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol='skipped')
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='skipped')
 
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=lr, evaluator=bcevaluator, estimatorParamMaps=paramGrid)

In [0]:
%%time
# stagesLR = stages  + [pca] + [cv]
# stagesLR = stages  + [selector] + [cv]
stagesLR = stages  + [cv]
full_pipeline = Pipeline(stages=stagesLR)
pipelineModel = full_pipeline.fit(train_spdf)

We then use our best model from the CV stage to transform our hold out test set and assess its performance using our evaluator

In [0]:
cvPredLR = pipelineModel.transform(test_spdf)

In [0]:
print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredLR)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredLR)}")

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='skipped')
auc       = evaluator.evaluate(cvPredLR)
print('AUC: ', auc)

In [0]:
print("Explained variance by principal components:",pipelineModel.stages[-2].explainedVariance)

In [0]:
cvPredLR.createOrReplaceTempView("finalPredictionsLR")

In [0]:
# -- %sql
# -- SELECT us_popularity_estimate, prediction, count(*) AS count
# -- FROM finalPredictionsLR
# -- GROUP BY us_popularity_estimate, prediction
# -- ORDER BY us_popularity_estimate DESC

In [0]:
# best_model = cv.bestModel
# params = best_model.extractParamMap()
# AUC = bcevaluator.evaluate(best_model)
# print("Model - AUC: {}".format(AUC))
# print("Model - Parameters: {}".format(params))

### Decision Trees

When using DTs, we cannot have categorical features with more than 50 possible values so we must remove the columns with such issues

In [0]:
from pyspark.sql.functions import col, countDistinct

categorical_cols = [col_name for col_name, col_type in train_spdf.dtypes if col_type in ('string', 'boolean')]

train_spdf.select(*categorical_cols).agg(*(countDistinct(col(col_name)).alias(col_name) for col_name in categorical_cols)).show()

+----------+------------+-------------------------------+-----------------------------+--------+----------------------------------------+--------------------------------------+-----------------+
|session_id|context_type|hist_user_behavior_reason_start|hist_user_behavior_reason_end|track_id|previous_hist_user_behavior_reason_start|previous_hist_user_behavior_reason_end|previous_track_id|
+----------+------------+-------------------------------+-----------------------------+--------+----------------------------------------+--------------------------------------+-----------------+
|     10000|           6|                              9|                            7|   40540|                                       9|                                     7|            40474|
+----------+------------+-------------------------------+-----------------------------+--------+----------------------------------------+--------------------------------------+-----------------+



In [0]:
#Deleting columns with too many unique categories - this will not work for tree algorithms

#(train_spdf, test_spdf) = spotify_spdf.randomSplit([0.7, 0.3])
columns_to_drop = ['session_id', 'track_id','previous_track_id','hist_user_behavior_reason_end','hist_user_behavior_reason_start','previous_hist_user_behavior_reason_end']
train_spdf = train_spdf.drop(*columns_to_drop)
test_spdf = test_spdf.drop(*columns_to_drop)
print(train_spdf.count()) 
print(test_spdf.count())

117413
50467


In [0]:
#cast numerical columns to float
categorical_cols = [#'context_type',  #'hist_user_behavior_reason_start', #'hist_user_behavior_reason_end',
                    'previous_hist_user_behavior_reason_start']#, 'previous_hist_user_behavior_reason_end']
numerical_cols = (train_spdf.select([c for c in train_spdf.columns if c not in categorical_cols and c != 'skipped'])).columns
label_col = ['skipped']
train_spdf  = convertColumn(train_spdf,  numerical_cols + label_col, FloatType())
test_spdf   = convertColumn(test_spdf,   numerical_cols , FloatType())

# fill missing values with 0
train_spdf  = train_spdf.dropna()
test_spdf   = test_spdf.dropna()

stages = []

for categorical_col in categorical_cols:
    string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "_index", handleInvalid='keep')
    stages += [string_indexer]

assembler_inputs = numerical_cols + [c + "_index" for c in categorical_cols]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

stages  += [assembler]

In [0]:
print(train_spdf.count()) 
print(test_spdf.count())

117413
50466


In [0]:
dt = DecisionTreeClassifier(labelCol="skipped", featuresCol="features")

In [0]:
# Define a grid of hyperparameters to test:
#  - maxDepth: maximum depth of each decision tree 
#  - maxIter: iterations, or the total number of trees 
paramGrid = ParamGridBuilder()\
  .addGrid(dt.maxDepth, [15, 30])\
  .addGrid(dt.maxBins, [10, 60, 80])\
  .build()
 
# Define an evaluation metric.  The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
bcevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol='skipped')
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='skipped')
 
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=dt, evaluator=bcevaluator, estimatorParamMaps=paramGrid)

In [0]:
pca = PCA(k=50, inputCol="features", outputCol="pcaFeatures")
selector = ChiSqSelector(numTopFeatures=50, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="skipped")

In [0]:
%%time
start_time =time.time()
stagesDT = stages + [cv]
full_pipeline = Pipeline(stages=stagesDT)
pipelineModel = full_pipeline.fit(train_spdf)
print( "--- %s seconds ---" % (time.time() - start_time))

--- 2851.278951883316 seconds ---
CPU times: user 3.81 s, sys: 2.33 s, total: 6.14 s
Wall time: 47min 31s


We can save our final model for later use in our final model notebook, this is similar to how a data team might work in a production environment

In [0]:
bestModel = pipelineModel.stages[-1].bestModel
from pyspark.ml.util import MLWriter

# Delete current model
dbutils.fs.rm("dbfs/final_model", True)
# Saving the model
pipelineModel.save("dbfs/final_model")

In [0]:
cvPredDT = pipelineModel.transform(test_spdf)

In [0]:
# print("Explained variance by principal components:",pipelineModel.stages[-2].explainedVariance)

In [0]:
print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredDT)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDT)}")

Area under ROC curve: 0.7063582404355624
Accuracy: 0.7058613720128404


We can look at the feature importances using our DT model

In [0]:
bestModel = pipelineModel.stages[-1].bestModel
importances = bestModel.featureImportances
feature_names = [col for col in train_spdf.columns if col != 'skipped']
sorted_importances = sorted(zip(importances, assembler_inputs), key=lambda x: x[0], reverse=True)
for feature, importance in sorted_importances:
    print(feature, importance)

0.45612537532148617 previous_skipped
0.07572138245128957 session_position
0.0566950451811663 previous_hist_user_behavior_reason_start_index
0.05268523443572859 speechiness
0.05036111390550791 acoustic_vector_0
0.045604983361696355 previous_duration
0.04535211037931124 previous_acousticness
0.036720448655432725 previous_acoustic_vector_6
0.03494277086039699 previous_acoustic_vector_0
0.03453264018720878 session_length
0.03296030505455206 previous_no_pause_before_play
0.01976568651483101 no_pause_before_play
0.018082404608740603 context_type
0.01602118249556911 context_switch
0.015886105289590398 hist_user_behavior_n_seekback
0.006075848654735707 hist_user_behavior_is_shuffle
0.0024673626427565215 previous_hist_user_behavior_is_shuffle


We can also visualise the decision tree to better understand how it got to its output

In [0]:
print(bestModel.toDebugString)


DecisionTreeClassificationModel: uid=DecisionTreeClassifier_7358d5ad032f, depth=15, numNodes=7885, numClasses=2, numFeatures=74
  If (feature 41 <= 0.5)
   If (feature 73 in {0.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0})
    If (feature 3 <= 0.5)
     If (feature 6 <= 0.5)
      If (feature 72 in {2.0,3.0,4.0,5.0})
       If (feature 33 <= -0.132774256169796)
        If (feature 11 <= 99.99957656860352)
         If (feature 42 <= 241.8919677734375)
          If (feature 1 <= 17.5)
           If (feature 62 <= 0.4947872757911682)
            If (feature 33 <= -0.22053587436676025)
             If (feature 42 <= 219.64127349853516)
              If (feature 51 <= 0.9843024611473083)
               If (feature 43 <= 2011.5)
                Predict: 1.0
               Else (feature 43 > 2011.5)
                Predict: 0.0
              Else (feature 51 > 0.9843024611473083)
               If (feature 55 <= -4.88100004196167)
                If (feature 19 <= -10.512499809265137)
                 Pred

In [0]:
cvPredDT.createOrReplaceTempView("finalPredictionsDT")

In [0]:
%sql
SELECT us_popularity_estimate, prediction, count(*) AS count
FROM finalPredictionsDT
GROUP BY us_popularity_estimate, prediction
ORDER BY us_popularity_estimate DESC

### Gradient-Boosted Trees (GBTs)

In [0]:
gbt = GBTClassifier(labelCol="skipped",featuresCol='features')

In [0]:
# Define a grid of hyperparameters to test:
#  - maxDepth: maximum depth of each decision tree 
#  - maxIter: iterations, or the total number of trees 
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [2, 5])\
  .addGrid(gbt.maxIter, [10, 100])\
  .build()
 
# Define an evaluation metric.  The CrossValidator compares the true labels with predicted values for each combination of parameters, and calculates this value to determine the best model.
bcevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol='skipped')
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy",labelCol='skipped')
 
# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=gbt, evaluator=bcevaluator, estimatorParamMaps=paramGrid)

In [0]:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")

In [0]:
stagesGBT = stages + [cv]
full_pipeline = Pipeline(stages=stagesGBT)
pipelineModel = full_pipeline.fit(train_spdf)

In [0]:
# print("Explained variance by principal components:",pipelineModel.stages[-2].explainedVariance)

In [0]:
cvPredGBT = pipelineModel.transform(test_spdf)

In [0]:
print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredGBT)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredGBT)}")

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='skipped')
auc       = evaluator.evaluate(cvPredGBT)
print('AUC: ', auc)

In [0]:
# cvPredGBT.createOrReplaceTempView("finalPredictionsGBT")

In [0]:
# %sql
# SELECT us_popularity_estimate, prediction, count(*) AS count
# FROM finalPredictionsGBT
# GROUP BY us_popularity_estimate, prediction
# ORDER BY us_popularity_estimate DESC

In [0]:
# best_model = cv.bestModel
# params = best_model.extractParamMap()
# AUC = bcevaluator.evaluate(best_model)
# print("Model - AUC: {}".format(AUC))
# print("Model - Parameters: {}".format(params))

### Support Vector Machine

In [0]:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")

In [0]:
%%time
svm = LinearSVC(labelCol="skipped", featuresCol='pcaFeatures')

# Define a grid of hyperparameters to test:

paramGrid = ParamGridBuilder()\
    .addGrid(svm.regParam, [0.01, 0.1])\
    .addGrid(svm.maxIter, [10, 100])\
    .addGrid(svm.fitIntercept, [True])\
    .addGrid(svm.aggregationDepth, [2])\
    .build()

bcevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol='skipped')
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy",labelCol='skipped')

# Declare the CrossValidator, which performs the model tuning.
cv = CrossValidator(estimator=svm, evaluator=bcevaluator, estimatorParamMaps=paramGrid)

stagesSVM = stages  + [cv]
full_pipeline = Pipeline(stages=stagesSVM)
pipelineModel = full_pipeline.fit(train_spdf)

In [0]:
cvPredSVM = pipelineModel.transform(test_spdf)

In [0]:
print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredSVM)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredSVM)}")

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='skipped')
auc       = evaluator.evaluate(cvPredSVM)
print('AUC: ', auc)

In [0]:
cvPredSVM.createOrReplaceTempView("finalPredictionsSVM")

In [0]:
# %sql
# SELECT us_popularity_estimate, prediction, count(*) AS count
# FROM finalPredictionsSVM
# GROUP BY us_popularity_estimate, prediction
# ORDER BY us_popularity_estimate DESC

In [0]:
# best_model = cv.bestModel
# params = best_model.extractParamMap()
# AUC = bcevaluator.evaluate(best_model)
# print("Model - AUC: {}".format(AUC))
# print("Model - Parameters: {}".format(params))

### Naive Bayes Classifier

In [0]:
train_spdf.count()

In [0]:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")

In [0]:
%%time
nb = NaiveBayes(labelCol="skipped", featuresCol='features')

paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build()

bcevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol='skipped')
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy",labelCol='skipped')

cv = CrossValidator(estimator=nb, evaluator=bcevaluator, estimatorParamMaps=paramGrid)

stagesNB = stages  + [cv]
full_pipelineNB = Pipeline(stages=stagesNB)
pipelineModelNB = full_pipelineNB.fit(train_spdf)

In [0]:
assembler_inputs

In [0]:
explained_variance = pipelineModel.stages[-2].explainedVariance
print(explained_variance)

In [0]:
cvPredNB = pipelineModel.transform(test_spdf)

print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredNB)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredNB)}")

In [0]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='skipped')
auc       = evaluator.evaluate(cvPredNB)
print('AUC: ', auc)

In [0]:
cvPredNB.createOrReplaceTempView("finalPredictionsNB")

### LightGBM Classifier

In [0]:
pip install mmlspark

In [0]:
from mmlspark import LightGBMRegressor

lgbm = LightGBMClassifier(labelCol="skipped", featuresCol='features')


In [0]:
# Start an MLflow run
with mlflow.start_run():
    # Log the pipeline and parameters
    mlflow.log_param("regParam", lgbm.regParam)
    mlflow.log_param("maxIter", lgbm.maxIter)
    mlflow.log_param("fitIntercept", lgbm.fitIntercept)
    mlflow.log_param("aggregationDepth", lgbm.aggregationDepth)
    # Define a grid of hyperparameters to test:

    # Define a grid of hyperparameters to test:

    paramGrid = ParamGridBuilder()\
        .addGrid(lgbm.regParam, [0.01, 0.1])\
        .addGrid(lgbm.maxIter, [10, 100])\
        .addGrid(lgbm.fitIntercept, [True])\
        .addGrid(lgbm.aggregationDepth, [2])\
        .build()

    bcevaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", rawPredictionCol="rawPrediction", labelCol='skipped')
   # mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")

    # Declare the CrossValidator, which performs the model tuning.
    cv = CrossValidator(estimator=lgbm, evaluator=bcevaluator, estimatorParamMaps=paramGrid)

    stages = stages + [cv]
    full_pipeline = Pipeline(stages=stages)
    pipelineModel = full_pipeline.fit(train_spdf)

    # Extract the best model from the CrossValidator
    bestModel = pipelineModel.stages[-1].bestModel
    # Evaluate the best model
    predictions = bestModel.transform(test_spdf)
    # Log the evaluation metrics
    mlflow.log_metric("areaUnderROC", bcevaluator.evaluate(predictions))
    mlflow.log_metric("accuracy", mcEvaluator.evaluate(predictions))


In [0]:
stagesLGBM = stages + [pca] + [cv]
full_pipeline = Pipeline(stages=stagesLGBM)
pipelineModel = full_pipeline.fit(train_spdf)

In [0]:
cvPredLGBM = pipelineModel.transform(test_spdf)

print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredLGBM)}")

###Random Forest

In [0]:
# define parameters
numTrees         = [5, 10, 50, 100]
subsamplingRate  = [0.8]
maxDepth         = [10, 15]
experiments      = list(itertools.product(numTrees, maxDepth, subsamplingRate))
print(len(experiments))

In [0]:
for ind, experiment in enumerate(experiments):
    numTrees = experiment[0]
    maxDepth = experiment[1]
    subsamplingRate = experiment[2]

    start_time = time.time()
    print(ind)
    print('params: ', numTrees, maxDepth, subsamplingRate)
    
    # Create an initial RandomForest model.
    rf = RandomForestClassifier(labelCol="skipped", featuresCol="features", 
                                numTrees=numTrees, 
                                maxDepth=maxDepth,
                                subsamplingRate=subsamplingRate)
    
    # Train model with Training Data
    stagesRF = stages + [rf]
    rfpipeline = Pipeline(stages=stages)
    rfModel = rfpipeline.fit(train_spdf)
    
    # Make predictions on validation data using the transform() method.
    # LogisticRegression.transform() will only use the 'features' column.
    predictions = rfModel.transform(test_spdf)
    
    # evaluate predictions
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='skipped')
    mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy",labelCol='skipped')
    auc       = evaluator.evaluate(predictions)
    
    print('AUC: ', auc)
    print(f"Accuracy: {mcEvaluator.evaluate(predictions)}")
    print("--- %s seconds ---" % (time.time() - start_time))

In [0]:
cvPredRF = rfModel.transform(test_spdf)

print(f"Area under ROC curve: {bcevaluator.evaluate(cvPredNB)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredNB)}")

In [0]:
#test