# Classification in PySpark's MLlib Project Solution

### Genre classification
Now it's time to leverage what we learned in the lectures to a REAL classification project! Have you ever wondered what makes us, humans, able to tell apart two songs of different genres? How we do we inherenly know the difference between a pop song and heavy metal? This type of classifcation may seem easy for us, but it's a very difficult challenge for a computer to do. So the question is, could an automatic genre classifcation model be possible? 

For this project we will be classifying songs based on a number of characteristics into a set of 23 electronic genres. This technology could be used by an application like Pandora to recommend songs to users or just create meaningful channels. Super fun!

### Dataset
*beatsdataset.csv*
Each row is an electronic music song. The dataset contains 100 song for each genre among 23 electronic music genres, they were the top (100) songs of their genres on November 2016. The 71 columns are audio features extracted of a two random minutes sample of the file audio. These features have been extracted using pyAudioAnalysis (https://github.com/tyiannak/pyAudioAnalysis).

### Your task
Create an algorithm that classifies songs into the 23 genres provided. Test out several different models and select the highest performing one. Also play around with feature selection methods and finally try to make a recommendation to a user.  

For the feature selection aspect of this project, you may need to get a bit creative if you want to select features from a non-tree algorithm. I did not go over this aspect of PySpark intentionally in the previous lectures to give you chance to get used to researching the PySpark documentation page. Here is the link to the Feature Selectors section of the documentation that just might come in handy: https://spark.apache.org/docs/latest/ml-features.html#feature-selectors

Good luck! Have fun :)

### Source
https://www.kaggle.com/caparrini/beatsdataset

In [61]:
# Read in functions we will need
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

In [62]:
# First let's create our PySpark instance
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Classification").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark
# Click the hyperlinked "Spark UI" link to view details about your Spark session

You are working with 1 core(s)


In [63]:
path =""
df = spark.read.csv(path+'beatsdataset.csv',inferSchema=True,header=True)

In [64]:
df.limit(6).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,63-ChromaVector8std,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.003431,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.004461,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom
2,2,0.085308,0.128525,3.123837,0.217205,0.228652,0.789647,0.008247,0.156822,-22.472722,...,0.001529,0.004556,0.007723,0.017482,0.002901,0.022201,133.333333,0.123373,129.0,BigRoom
3,3,0.10305,0.167042,3.15083,0.233593,0.245032,0.967082,0.006571,0.168083,-21.470751,...,0.001591,0.003514,0.009477,0.023162,0.004165,0.015379,133.333333,0.158876,129.0,BigRoom
4,4,0.15173,0.148405,3.194498,0.29373,0.267231,1.353005,0.003872,0.292055,-21.371157,...,0.003945,0.004131,0.01133,0.028188,0.002639,0.019079,133.333333,0.190708,129.0,BigRoom
5,5,0.127047,0.153488,3.221987,0.261693,0.257361,1.090034,0.004943,0.230099,-21.234846,...,0.002986,0.006533,0.010347,0.025008,0.003035,0.019479,133.333333,0.168933,129.0,BigRoom


In [65]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 1-ZCRm: double (nullable = true)
 |-- 2-Energym: double (nullable = true)
 |-- 3-EnergyEntropym: double (nullable = true)
 |-- 4-SpectralCentroidm: double (nullable = true)
 |-- 5-SpectralSpreadm: double (nullable = true)
 |-- 6-SpectralEntropym: double (nullable = true)
 |-- 7-SpectralFluxm: double (nullable = true)
 |-- 8-SpectralRolloffm: double (nullable = true)
 |-- 9-MFCCs1m: double (nullable = true)
 |-- 10-MFCCs2m: double (nullable = true)
 |-- 11-MFCCs3m: double (nullable = true)
 |-- 12-MFCCs4m: double (nullable = true)
 |-- 13-MFCCs5m: double (nullable = true)
 |-- 14-MFCCs6m: double (nullable = true)
 |-- 15-MFCCs7m: double (nullable = true)
 |-- 16-MFCCs8m: double (nullable = true)
 |-- 17-MFCCs9m: double (nullable = true)
 |-- 18-MFCCs10m: double (nullable = true)
 |-- 19-MFCCs11m: double (nullable = true)
 |-- 20-MFCCs12m: double (nullable = true)
 |-- 21-MFCCs13m: double (nullable = true)
 |-- 22-ChromaVector1m: double (null

In [66]:
df.groupBy("class").count().show()

+--------------------+-----+
|               class|count|
+--------------------+-----+
|           PsyTrance|  100|
|           HardDance|  100|
|              Breaks|  100|
|  HardcoreHardTechno|  100|
|   IndieDanceNuDisco|  100|
|              Trance|  100|
|           DeepHouse|  100|
|ElectronicaDowntempo|  100|
|           ReggaeDub|  100|
|             Minimal|  100|
|         DrumAndBass|  100|
|             Dubstep|  100|
|             BigRoom|  100|
|              Techno|  100|
|               House|  100|
|         FutureHouse|  100|
|        ElectroHouse|  100|
|           GlitchHop|  100|
|           TechHouse|  100|
|              HipHop|  100|
+--------------------+-----+
only showing top 20 rows



In [67]:
input_columns = df.columns # Collect the column names as a list
input_columns = input_columns[1:-1] # keep only relevant columns: from column 1 to 

dependent_var = 'class'

In [68]:
# change label (class variable) to string type to prep for reindexing
# Pyspark is expecting a zero indexed integer for the label column. 
# Just in case our data is not in that format... we will treat it by using the StringIndexer built in method
renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) #Rename and change to string type
indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 
indexed = indexer.fit(renamed).transform(renamed)

In [69]:
# Convert all string type data in the input column list to numeric
# Otherwise the Algorithm will not be able to process it

# Also we will use these lists later on
numeric_inputs = []
string_inputs = []
for column in input_columns:
    # First identify the string vars in your input column list
    if str(indexed.schema[column].dataType) == 'StringType':
        # Set up your String Indexer function
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
        # Then call on the indexer you created here
        indexed = indexer.fit(indexed).transform(indexed)
        # Rename the column to a new name so you can disinguish it from the original
        new_col_name = column+"_num"
        # Add the new column name to the string inputs list
        string_inputs.append(new_col_name)
    else:
        # If no change was needed, take no action 
        # And add the numeric var to the num list
        numeric_inputs.append(column)

In [70]:
numeric_inputs,string_inputs

(['1-ZCRm',
  '2-Energym',
  '3-EnergyEntropym',
  '4-SpectralCentroidm',
  '5-SpectralSpreadm',
  '6-SpectralEntropym',
  '7-SpectralFluxm',
  '8-SpectralRolloffm',
  '9-MFCCs1m',
  '10-MFCCs2m',
  '11-MFCCs3m',
  '12-MFCCs4m',
  '13-MFCCs5m',
  '14-MFCCs6m',
  '15-MFCCs7m',
  '16-MFCCs8m',
  '17-MFCCs9m',
  '18-MFCCs10m',
  '19-MFCCs11m',
  '20-MFCCs12m',
  '21-MFCCs13m',
  '22-ChromaVector1m',
  '23-ChromaVector2m',
  '24-ChromaVector3m',
  '25-ChromaVector4m',
  '26-ChromaVector5m',
  '27-ChromaVector6m',
  '28-ChromaVector7m',
  '29-ChromaVector8m',
  '30-ChromaVector9m',
  '31-ChromaVector10m',
  '32-ChromaVector11m',
  '33-ChromaVector12m',
  '34-ChromaDeviationm',
  '35-ZCRstd',
  '36-Energystd',
  '37-EnergyEntropystd',
  '38-SpectralCentroidstd',
  '39-SpectralSpreadstd',
  '40-SpectralEntropystd',
  '41-SpectralFluxstd',
  '42-SpectralRolloffstd',
  '43-MFCCs1std',
  '44-MFCCs2std',
  '45-MFCCs3std',
  '46-MFCCs4std',
  '47-MFCCs5std',
  '48-MFCCs6std',
  '49-MFCCs7std',
  '

In [71]:
# Treat for skewness
# Flooring and capping
# Plus if right skew take the log +1
# if left skew do exp transformation
# This is best practice

# create empty dictionary d
d = {}
# Create a dictionary of quantiles from your numeric cols
# I'm doing the top and bottom 1% but you can adjust if needed
for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number

#Now check for skewness for all numeric cols
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
    skew = skew[0][0]
    # If skewness is found,
    # This function will make the appropriate corrections
    if skew > 1: # If right skew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

7-SpectralFluxm has been treated for positive (right) skewness. (skew =) 1.6396138160129063 )
22-ChromaVector1m has been treated for positive (right) skewness. (skew =) 2.4162415204309258 )
23-ChromaVector2m has been treated for positive (right) skewness. (skew =) 4.154796693680583 )
24-ChromaVector3m has been treated for positive (right) skewness. (skew =) 1.1974019617504328 )
25-ChromaVector4m has been treated for positive (right) skewness. (skew =) 2.446635863594906 )
26-ChromaVector5m has been treated for positive (right) skewness. (skew =) 2.154482876187508 )
27-ChromaVector6m has been treated for positive (right) skewness. (skew =) 2.01234064472543 )
28-ChromaVector7m has been treated for positive (right) skewness. (skew =) 1.1829228989215521 )
29-ChromaVector8m has been treated for positive (right) skewness. (skew =) 3.7372643733999955 )
30-ChromaVector9m has been treated for positive (right) skewness. (skew =) 2.4117416421548645 )
31-ChromaVector10m has been treated for positiv

In [72]:
# Now check for negative values in the dataframe. 
# Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
# Note: we only need to check the numeric input values since anything that is indexed won't have negative values

# Calculate the mins for all columns in the df
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 
# Create an array for all mins and select only the input cols
min_array = minimums.select(array(numeric_inputs).alias("mins")) 
# Collect golobal min as Python object
df_minimum = min_array.select(array_min(min_array.mins)).collect() 
# Slice to get the number itself
df_minimum = df_minimum[0][0] 

# If there are ANY negative vals found in the df, print a warning message
if df_minimum < 0:
    print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
else:
    print("No negative values were found in your dataframe.")



In [73]:
df.limit(5).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,63-ChromaVector8std,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.003431,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.004461,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom
2,2,0.085308,0.128525,3.123837,0.217205,0.228652,0.789647,0.008247,0.156822,-22.472722,...,0.001529,0.004556,0.007723,0.017482,0.002901,0.022201,133.333333,0.123373,129.0,BigRoom
3,3,0.10305,0.167042,3.15083,0.233593,0.245032,0.967082,0.006571,0.168083,-21.470751,...,0.001591,0.003514,0.009477,0.023162,0.004165,0.015379,133.333333,0.158876,129.0,BigRoom
4,4,0.15173,0.148405,3.194498,0.29373,0.267231,1.353005,0.003872,0.292055,-21.371157,...,0.003945,0.004131,0.01133,0.028188,0.002639,0.019079,133.333333,0.190708,129.0,BigRoom


In [74]:
# Before we correct for negative values that may have been found above, 
# We need to vectorize our df
# becauase the function that we use to make that correction requires a vector. 
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [75]:
# Create the mix max scaler object 
# This is what will correct for negative values
# I like to use a high range like 1,000 
#     because I only see one decimal place in the final_data.show() call
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show(10)

Features scaled to range: [0.000000, 1000.000000]
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[519.818266700239...|
|  0.0|[435.295463992565...|
|  0.0|[297.057129121742...|
|  0.0|[374.352647734368...|
|  0.0|[586.432337466259...|
|  0.0|[478.897323979332...|
|  0.0|[462.989461602348...|
|  0.0|[535.448873531282...|
|  0.0|[437.894973202184...|
|  0.0|[524.003195633498...|
+-----+--------------------+
only showing top 10 rows



In [76]:
train,test = final_data.randomSplit([0.7,0.3])
seed = 40
train_val = 0.7
test_val = 1-train_val
train,test = final_data.randomSplit([train_val,test_val],seed=seed)

In [77]:
# Read in dependencies
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import warnings

# Mlflow libaries
import mlflow
from mlflow import spark

In [78]:
# Set experiment
# This will actually automatically create one if the one you call on doesn't exist
mlflow.set_experiment(experiment_name = "Experiment-3")

# set up your client
from  mlflow.tracking import MlflowClient
client = MlflowClient()

In [79]:
# Create a run and attach it to the experiment you just created
experiments = client.list_experiments() # returns a list of mlflow.entities.Experiment

experiment_name = "Experiment-3"
def create_run(experiment_name):
    mlflow.set_experiment(experiment_name = experiment_name)
    for x in experiments:
        if experiment_name in x.name:
#             print(experiment_name)
#             print(x)
            experiment_index = experiments.index(x)
            run = client.create_run(experiments[experiment_index].experiment_id) # returns mlflow.entities.Run
#             print(run)
            return run

# Example run command
# run = create_run('Experiment-3')
# run = create_run(experiment_name)

In [80]:
# test the functionality here
run = create_run('Experiment-3')
#dummy run
# Add tag to a run
client.set_tag(run.info.run_id, "Algorithm", "Gradient Boosted Tree")
client.set_tag(run.info.run_id,"Random Seed",908)
client.set_tag(run.info.run_id,"Train Perct",0.7)

# Add params and metrics to a run
client.log_param(run.info.run_id, "Max Depth", 90)
client.log_param(run.info.run_id, "Max Bins", 50)
client.log_metric(run.info.run_id, "Accuracy", 0.87)

# Terminate the client
client.set_terminated(run.info.run_id)

In [81]:
# Set up our evaluation objects
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction') #labelCol='label'
# Bin_evaluator = BinaryClassificationEvaluator() #labelCol='label'
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",

In [84]:
run = create_run(experiment_name)

# This method uses cross validation and allows for hyperparamter tuning via grid searching
classifier = LogisticRegression()

# Set up your parameter grid for the cross validator to consudt hyperparameter tuning
paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
             .addGrid(classifier.maxIter, [10, 15,20])
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 + is best practice

# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Collect the best model and
# print the coefficient matrix
# These values should be compared relative to eachother
# And intercepts can be prepared to other models
BestModel = fitModel.bestModel
print("Intercept: " + str(BestModel.interceptVector))
print("Coefficients: \n" + str(BestModel.coefficientMatrix))

# Generate predictions
# fitModel automatically uses the best model 
# so we don't need to use BestModel here
predictions = fitModel.transform(test)

# Now print the accuract rate of the model or AUC for a binary classifier
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)

# Log Model (can't do this to the client)
# mlflow.spark.log_model(fitModel, "model")

# Extract params of Best Model
paramMap = BestModel.extractParamMap()

# Log parameters to the client
for key, val in paramMap.items():
    if 'maxIter' in key.name:
        client.log_param(run.info.run_id, "Max Iter", val)
for key, val in paramMap.items():
    if 'regParam' in key.name:
        client.log_param(run.info.run_id, "Reg Param", val)

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

Intercept: [-0.02149608036230148,0.06823021549273668,0.05089599626183121,-0.016015264794603617,0.0012217647520220431,-0.12228572510856088,0.03143508246725725,-0.032106564938455574,0.08702223045752905,-0.12904354673129048,0.061869293594756646,0.039890675771671755,0.14360464619554844,-0.047599785262046564,-0.0025730922846182096,0.023672537191425,-0.10930463816432477,-0.1338061036542189,0.01679906966017987,-0.06403816986208058,0.025181359666381558,0.05413847311922635,0.07430762653193526]
Coefficients: 
DenseMatrix([[ 4.55588959e-04,  1.53307412e-03,  1.65412860e-03, ...,
              -1.59910181e-03, -6.90430408e-04,  1.56058217e-03],
             [-2.15195846e-04, -1.57164476e-04,  2.75060835e-04, ...,
               3.25830421e-03, -2.55229466e-03,  2.12409079e-03],
             [-3.93704164e-04,  1.08456676e-04,  7.70871582e-04, ...,
              -2.32575574e-03,  3.75468755e-04, -1.02683565e-03],
             ...,
             [ 4.52395004e-04, -8.01739208e-04, -3.05974317e-04, ...,

In [85]:
# Create a new run
run = create_run(experiment_name)

# instantiate the base classifier.
lr = LogisticRegression()
# instantiate the One Vs Rest Classifier.
classifier = OneVsRest(classifier=lr)

# Add parameters of your choice here:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()
#Cross Validator requires the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 is best practice

# Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Print the Coefficients
# First we need to extract the best model from fit model

# Get Best Model
BestModel = fitModel.bestModel
# Extract list of binary models
models = BestModel.models
for model in models:
    print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)
        
# Now generate predictions on test dataset
predictions = fitModel.transform(test)
# And calculate the accuracy score
accuracy = (MC_evaluator.evaluate(predictions))*100
# And print
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)

# Log Model (can't do this to the client)
# mlflow.spark.log_model(fitModel, "model")

# Extract params of Best Model
paramMap = BestModel.extractParamMap()

# Log parameters to the client
for key, val in paramMap.items():
    if 'maxIter' in key.name:
        client.log_param(run.info.run_id, "Max Iter", val)
for key, val in paramMap.items():
    if 'regParam' in key.name:
        client.log_param(run.info.run_id, "Reg Param", 5)

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

[1mIntercept: [0m -5.799228550714 [1m
Coefficients:[0m [-0.00046422849831339024,0.0014960355186393627,0.0016556173009855955,-0.00010322012542998654,0.002117418074172022,0.0004247642395075349,-0.00161574817649254,-0.0007420628071019194,0.003946614922006337,-0.0012653790174592167,0.0009990519028971268,0.000620048491703367,0.0004771564362796456,-0.00022591170701251524,-0.0006667181560669461,-0.00046685391533160547,-0.00033093758380358003,0.001699209804796262,-0.0004462067255278849,0.0003426139610700278,-0.0006551523032293889,0.0019559062847088055,-0.0031141214024889173,0.0010637453418667331,-0.0005468095962605253,0.00040272265009407975,0.00030860851326417875,-0.0018647785215210075,-0.0024745624932917608,-5.35247445527883e-05,-0.0005670793646301597,-0.0003613121336033365,0.0008151687974082931,-0.0017394895997368766,-0.0020748250337786996,9.329814708464934e-05,1.6448349224507564e-05,-0.002297848255141204,-0.0012770854350293455,0.0005579136862624061,0.00015280078489437088,-0.000709249025

43.96423248882265


In [86]:
# Create a new run
run = create_run(experiment_name)

# Count how many features you have
features = final_data.select(['features']).collect()
features_count = len(features[0][0])
# Then use this number to specify the layers according to best practice
layers = [features_count, features_count+1, features_count, classes]
# Instaniate the classifier
classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# Fit the model (this classifier doesn't have a cross validator option)
fitModel = classifier.fit(train)

# Print the model Weights
print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
   
# Generate predictions on test dataframe
predictions = fitModel.transform(test)
# Calculate accuracy score
accuracy = (MC_evaluator.evaluate(predictions))*100
# Print accuracy score
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)
client.set_tag(run.info.run_id,"Model Weight",fitModel.weights.size)

# # Log Model (can't do this to the client)
# # mlflow.spark.log_model(fitModel, "model")

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

[1mModel Weights: [0m 12023
16.24441132637854


In [87]:
# Create a new run
run = create_run(experiment_name)

# Add parameters of your choice here:
classifier = NaiveBayes()
paramGrid = (ParamGridBuilder() \
             .addGrid(classifier.smoothing, [0.0, 0.2, 0.4, 0.6]) \
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 + is best practice
# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)

# Log Model (can't do this to the client)
# mlflow.spark.log_model(fitModel, "model")

# Extract params of Best Model
# Get Best Model
BestModel = fitModel.bestModel
paramMap = BestModel.extractParamMap()

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

36.95976154992549


In [88]:
# Quick check
class_count = final_data.select(countDistinct("label")).collect()
classes = class_count[0][0]
if classes > 2:
    print("LinearSVC cannot be used because PySpark currently only accepts binary classification data for this algorithm")
else:
    print("Your good to go!")

LinearSVC cannot be used because PySpark currently only accepts binary classification data for this algorithm


In [89]:
# # Create a new run
# run = create_run(experiment_name)

# # Add parameters of your choice here:
# classifier = LinearSVC()
# paramGrid = (ParamGridBuilder() \
#              .addGrid(classifier.maxIter, [10, 15]) \
#              .addGrid(classifier.regParam, [0.1, 0.01]) \
#              .build())

# #Cross Validator requires all of the following parameters:
# crossval = CrossValidator(estimator=classifier,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=MulticlassClassificationEvaluator(),
#                           numFolds=2) # 3 + is best practice
# # Fit Model: Run cross-validation, and choose the best set of parameters.
# fitModel = crossval.fit(train)
# BestModel = fitModel.bestModel

# print('\033[1m' + " Coefficients"+ '\033[0m')
# print("You should compares these relative to eachother")
# print("Coefficients: \n" + str(BestModel.coefficients))
    
# predictions = fitModel.transform(test)
# accuracy = (MC_evaluator.evaluate(predictions))*100
# print(accuracy)

# ########### Track results in MLflow UI ################

# # Add tag to a run
# # Extract the name of the classifier
# classifier_name = type(classifier).__name__
# client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
# client.set_tag(run.info.run_id,"Random Seed",seed)
# client.set_tag(run.info.run_id,"Train Perct",train_val)

# # Log Model (can't do this to the client)
# # mlflow.spark.log_model(fitModel, "model")

# # Extract params of Best Model
# paramMap = BestModel.extractParamMap()

# # Log parameters to the client
# for key, val in paramMap.items():
#     if 'maxIter' in key.name:
#         client.log_param(run.info.run_id, "Max Iter", val)
# for key, val in paramMap.items():
#     if 'regParam' in key.name:
#         client.log_param(run.info.run_id, "Reg Param", 5)

# # Log metrics to the client
# client.log_metric(run.info.run_id, "Accuracy", accuracy)

# # Set a runs status to finished (best practice)
# client.set_terminated(run.info.run_id)

In [90]:
# Create a new run
run = create_run(experiment_name)

# Add parameters of your choice here:
classifier = DecisionTreeClassifier()
paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 + is best practice
# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Collect and print feature importances
BestModel = fitModel.bestModel
print("Feature Importance Scores (add up to 1)")
featureImportances = BestModel.featureImportances.toArray()
print(featureImportances)

predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)

# Log Model (can't do this to the client)
# mlflow.spark.log_model(fitModel, "model")

# Extract params of Best Model
paramMap = BestModel.extractParamMap()

# Log parameters to the client
for key, val in paramMap.items():
    if 'maxDepth' in key.name:
        client.log_param(run.info.run_id, "Max Depth", val)
for key, val in paramMap.items():
    if 'maxBins' in key.name:
        client.log_param(run.info.run_id, "Max Bins", 5)

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

Feature Importance Scores (add up to 1)
[0.02081764 0.05118817 0.01171838 0.01483404 0.         0.
 0.07700441 0.         0.03019932 0.         0.         0.
 0.00824512 0.         0.         0.         0.         0.
 0.00476809 0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.0178979  0.         0.         0.
 0.01173941 0.04071074 0.02011855 0.         0.         0.04577454
 0.         0.03032135 0.         0.06567548 0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.         0.         0.         0.
 0.         0.         0.0491326  0.11353245 0.38632183]
37.85394932935917


In [91]:
# zip input_columns qith feature importance scores and create df

# First convert featureimportance scores from numpy array to list
imp_scores = []
for x in featureImportances:
    imp_scores.append(int(x))
    
# Then zip with input_columns list and create a df
result = spark.createDataFrame(zip(input_columns,imp_scores), schema=['feature','score'])
print(result.orderBy(result["score"].desc()).show(truncate=False))

AttributeError: module 'mlflow.spark' has no attribute 'createDataFrame'

In [92]:
# Create a new run
run = create_run(experiment_name)

# Add parameters of your choice here:
classifier = RandomForestClassifier()
paramGrid = (ParamGridBuilder() \
               .addGrid(classifier.maxDepth, [2, 5, 10])
#                                .addGrid(classifier.maxBins, [5, 10, 20])
#                                .addGrid(classifier.numTrees, [5, 20, 50])
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 + is best practice

# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

# Retrieve best model from cross val
BestModel = fitModel.bestModel
print("Feature Importance Scores (add up to 1)")
featureImportances = BestModel.featureImportances.toArray()
print(featureImportances)

predictions = fitModel.transform(test)

accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)

# Log Model (can't do this to the client)
# mlflow.spark.log_model(fitModel, "model")

# Extract params of Best Model
paramMap = BestModel.extractParamMap()

# Log parameters to the client
for key, val in paramMap.items():
    if 'maxDepth' in key.name:
        client.log_param(run.info.run_id, "Max Depth", val)
for key, val in paramMap.items():
    if 'maxBins' in key.name:
        client.log_param(run.info.run_id, "Max Bins", 5)

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

Feature Importance Scores (add up to 1)
[0.02452951 0.02826608 0.01846563 0.01537593 0.01019634 0.01112698
 0.01347442 0.00746161 0.01978692 0.01599268 0.01111094 0.01149394
 0.0089437  0.00835238 0.00674792 0.00966588 0.00735222 0.01141882
 0.01016138 0.007549   0.00882493 0.00829091 0.00820047 0.01420782
 0.00910477 0.00993274 0.00790388 0.00809606 0.00909602 0.00760569
 0.00888249 0.00997186 0.0078522  0.01299461 0.01313738 0.0138498
 0.01024035 0.01438733 0.02459726 0.00944065 0.01606044 0.01025337
 0.02334221 0.02162931 0.01151848 0.01102211 0.01304802 0.01362931
 0.01648225 0.01248605 0.01394351 0.01987286 0.01661394 0.01425151
 0.01628015 0.0116149  0.00852716 0.01047246 0.00782175 0.01198437
 0.00956313 0.01325482 0.01428684 0.00548118 0.00733403 0.0111689
 0.00774623 0.01183569 0.04886875 0.03009204 0.08542476]
45.603576751117735


In [93]:
# Create a new run
run = create_run(experiment_name)
    
# Add parameters of your choice here:
classifier = GBTClassifier()

paramGrid = (ParamGridBuilder() \
#                              .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
#                              .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
             .addGrid(classifier.maxIter, [10, 15,50])
             .build())

#Cross Validator requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2) # 3 + is best practice

# Fit Model: Run cross-validation, and choose the best set of parameters.
fitModel = crossval.fit(train)

BestModel = fitModel.bestModel
print("Feature Importance Scores (add up to 1)")
featureImportances = BestModel.featureImportances.toArray()
print(featureImportances)
    
predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

########### Track results in MLflow UI ################

# Add tag to a run
# Extract the name of the classifier
classifier_name = type(classifier).__name__
client.set_tag(run.info.run_id, "Algorithm", classifier_name) 
client.set_tag(run.info.run_id,"Random Seed",seed)
client.set_tag(run.info.run_id,"Train Perct",train_val)

# Log Model (can't do this to the client)
# mlflow.spark.log_model(fitModel, "model")

# Extract params of Best Model
paramMap = BestModel.extractParamMap()

# Log parameters to the client
for key, val in paramMap.items():
    if 'maxDepth' in key.name:
        client.log_param(run.info.run_id, "Max Depth", val)
for key, val in paramMap.items():
    if 'maxBins' in key.name:
        client.log_param(run.info.run_id, "Max Bins", 5)

# Log metrics to the client
client.log_metric(run.info.run_id, "Accuracy", accuracy)

# Set a runs status to finished (best practice)
client.set_terminated(run.info.run_id)

Py4JJavaError: An error occurred while calling o88796.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 25167.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25167.0 (TID 28318) (192.168.29.100 executor driver): java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:177)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:174)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1207)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2296)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2297)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$1(RDD.scala:1209)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1202)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:125)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.boost(GradientBoostedTrees.scala:333)
	at org.apache.spark.ml.tree.impl.GradientBoostedTrees$.run(GradientBoostedTrees.scala:61)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$1(GBTClassifier.scala:210)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:171)
	at org.apache.spark.ml.classification.GBTClassifier.train(GBTClassifier.scala:59)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at jdk.internal.reflect.GeneratedMethodAccessor321.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: requirement failed: GBTClassifier was given dataset with invalid label 2.0.  Labels must be in {0,1}; note that GBTClassifier currently only supports binary classification.
	at scala.Predef$.require(Predef.scala:281)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2(GBTClassifier.scala:177)
	at org.apache.spark.ml.classification.GBTClassifier.$anonfun$train$2$adapted(GBTClassifier.scala:174)
	at org.apache.spark.ml.PredictorParams.$anonfun$extractInstances$2(Predictor.scala:96)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$aggregate$2(RDD.scala:1207)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2296)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [2]:
import mlflow
mlflow.start_run()

<ActiveRun: >