# Classification in PySpark's MLlib Project Solution

Now it's time to leverage what we learned in the lectures to a REAL classification project! For this project we will be classifying songs based on a number of characteristics into a set of 23 electronic genres. This technology could be used by application like Pandora to recommend songs to users or just create meaningful channels. Super fun!

Let's get started!

First let's create our PySpark instance:

In [1]:
# import findspark
# findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("Review2").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Let's read our dataset in for this notebook 

### Context
What makes us, humans, able to tell apart two songs of different genres? Maybe you have ever been in the diffcult situation to explain show it sounds the music style that you like to someone. Then, could an automatic genre classifcation be possible?

### Content
Each row is an electronic music song. The dataset contains 100 song for each genre among 23 electronic music genres, they were the top (100) songs of their genres on November 2016. The 71 columns are audio features extracted of a two random minutes sample of the file audio. These features have been extracted using pyAudioAnalysis (https://github.com/tyiannak/pyAudioAnalysis).

### Source
https://www.kaggle.com/caparrini/beatsdataset

In [2]:
path =""
df = spark.read.csv(path+'beatsdataset.csv',inferSchema=True,header=True)

### Check out the dataset

Let's produce a print out of the dataframe so we know what we are working with.

In [3]:
df.limit(6).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,63-ChromaVector8std,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.003431,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.004461,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom
2,2,0.085308,0.128525,3.123837,0.217205,0.228652,0.789647,0.008247,0.156822,-22.472722,...,0.001529,0.004556,0.007723,0.017482,0.002901,0.022201,133.333333,0.123373,129.0,BigRoom
3,3,0.10305,0.167042,3.15083,0.233593,0.245032,0.967082,0.006571,0.168083,-21.470751,...,0.001591,0.003514,0.009477,0.023162,0.004165,0.015379,133.333333,0.158876,129.0,BigRoom
4,4,0.15173,0.148405,3.194498,0.29373,0.267231,1.353005,0.003872,0.292055,-21.371157,...,0.003945,0.004131,0.01133,0.028188,0.002639,0.019079,133.333333,0.190708,129.0,BigRoom
5,5,0.127047,0.153488,3.221987,0.261693,0.257361,1.090034,0.004943,0.230099,-21.234846,...,0.002986,0.006533,0.010347,0.025008,0.003035,0.019479,133.333333,0.168933,129.0,BigRoom


In [4]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 1-ZCRm: double (nullable = true)
 |-- 2-Energym: double (nullable = true)
 |-- 3-EnergyEntropym: double (nullable = true)
 |-- 4-SpectralCentroidm: double (nullable = true)
 |-- 5-SpectralSpreadm: double (nullable = true)
 |-- 6-SpectralEntropym: double (nullable = true)
 |-- 7-SpectralFluxm: double (nullable = true)
 |-- 8-SpectralRolloffm: double (nullable = true)
 |-- 9-MFCCs1m: double (nullable = true)
 |-- 10-MFCCs2m: double (nullable = true)
 |-- 11-MFCCs3m: double (nullable = true)
 |-- 12-MFCCs4m: double (nullable = true)
 |-- 13-MFCCs5m: double (nullable = true)
 |-- 14-MFCCs6m: double (nullable = true)
 |-- 15-MFCCs7m: double (nullable = true)
 |-- 16-MFCCs8m: double (nullable = true)
 |-- 17-MFCCs9m: double (nullable = true)
 |-- 18-MFCCs10m: double (nullable = true)
 |-- 19-MFCCs11m: double (nullable = true)
 |-- 20-MFCCs12m: double (nullable = true)
 |-- 21-MFCCs13m: double (nullable = true)
 |-- 22-ChromaVector1m: double (null

### How many classes do we have?

Just making sure :) 

We have a perfectly balanced dataset! 

*Note: This never happens in real life :)*

In [5]:
df.groupBy("class").count().show(100)

+--------------------+-----+
|               class|count|
+--------------------+-----+
|           PsyTrance|  100|
|           HardDance|  100|
|              Breaks|  100|
|  HardcoreHardTechno|  100|
|   IndieDanceNuDisco|  100|
|              Trance|  100|
|           DeepHouse|  100|
|ElectronicaDowntempo|  100|
|           ReggaeDub|  100|
|             Minimal|  100|
|         DrumAndBass|  100|
|             Dubstep|  100|
|             BigRoom|  100|
|              Techno|  100|
|               House|  100|
|         FutureHouse|  100|
|        ElectroHouse|  100|
|           GlitchHop|  100|
|           TechHouse|  100|
|              HipHop|  100|
|           FunkRAndB|  100|
|               Dance|  100|
|    ProgressiveHouse|  100|
+--------------------+-----+



## Format Data 

MLlib requires all input columns of your dataframe to be vectorized. You will see that we rename our dependent var to label as that is what is expected for all MLlib applications. If rename once here, we never have to do it again!

Let's go ahead and create a function to do all of this 

In [6]:
# Data Prep function
def MLClassifierDFPrep(df,input_columns,dependent_var):
    
    # change label (class variable) to string type to prep for reindexing
    # Pyspark is expecting a zero indexed integer for the label column. 
    # Just incase our data is not in that format... we will treat it by using the StringIndexer built in method
    renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) #Rename and change to string type
    indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 
    indexed = indexer.fit(renamed).transform(renamed)

    # Convert all string type data in the input column list to numeric
    # Otherwise the Algorithm will not be able to process it
    numeric_inputs = []
    string_inputs = []
    for column in input_columns:
        if str(indexed.schema[column].dataType) == 'StringType':
            indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
            indexed = indexer.fit(indexed).transform(indexed)
            new_col_name = column+"_num"
            string_inputs.append(new_col_name)
        else:
            numeric_inputs.append(column)
            
    # Check globally for non normal data (using skewness and kurtosis)   
    skew = df.select([f.skewness(c).alias(c) for c in df.columns if c in numeric_inputs]) # Calculate the skewness for all columns in the df
    skew_array = skew.select(array(numeric_inputs).alias("skew")) #create and array with results
    #get the global min
    skew_minimum = skew_array.select(array_min(skew_array.skew)).collect() # Collect golobal min as Python object
    skew_minimum = skew_minimum[0][0] # Slice to get the number itself
    #get the global max
    skew_max = skew_array.select(array_max(skew_array.skew)).collect() # Collect golobal min as Python object
    skew_max = skew_max[0][0] # Slice to get the number itself
            
    if skew_max >1 or skew_min < -1:
        # Ask user if they want to floor and cap their data hollistically then act accordingly
        # Let's use approxQuantile, otherwise it'll take FOREVER
        # approxQuantile(cols: Array[String], probabilities: Array[Double], relativeError: Double): Array[Array[Double]] Permalink
        # Calculates the approximate quantiles of numerical columns of a DataFrame.
        print("Looks like your dataframe contains some non normal data which may contain outliers.")
        print("Would you like us to treat for that using flooring, capping and log(x+1) or e^(x+1)")
        print("This may help improve model accuracy.")
        floor_cap = str(input("Enter 1 for yes and 0 for no: "))
        if floor_cap == "1":
            print("Okay, we are correcting for non normality now!")
            # empty dictionary d
            d = {}
            # Create a dictionary of quantiles
            for col in numeric_inputs: 
                d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number
            #Now fill in the values
            for col in numeric_inputs:
                skew = indexed.agg(f.skewness(indexed[col])).collect() #check for skewness
                skew = skew[0][0]
                # This function will floor, cap and then log+1 (just in case there are 0 values)
                if skew > 1:
                    indexed = indexed.withColumn(col, \
                    F.log(F.when(df[col] < d[col][0],d[col][0])\
                    .when(indexed[col] > d[col][1], d[col][1])\
                    .otherwise(indexed[col] ) +1).alias(col))
                    print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
                elif skew < -1:
                    indexed = indexed.withColumn(col, \
                    F.exp(F.when(df[col] < d[col][0],d[col][0])\
                    .when(indexed[col] > d[col][1], d[col][1])\
                    .otherwise(indexed[col] )).alias(col))
                    print(col+" has been treated for negative (left) skewness. (skew =",skew,")")
        elif floor_cap == "0":
            print("Okay") 
            print("We will return the dataframe without flooring and capping.")
        else:
            print("You have entered an invalid response.")
            print("We will return the data without flooring and capping")

            
    # Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
    # Note: we only need to check the numeric input values since anything that is indexed won't have negative values
    minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) # Calculate the mins for all columns in the df
    min_array = minimums.select(array(numeric_inputs).alias("mins")) # Create an array for all mins and select only the input cols
    df_minimum = min_array.select(array_min(min_array.mins)).collect() # Collect golobal min as Python object
    df_minimum = df_minimum[0][0] # Slice to get the number itself

    features_list = numeric_inputs + string_inputs
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    output = assembler.transform(indexed).select('features','label')

#     final_data = output.select('features','label') #drop everything else
    
    # Now check for negative values and ask user if they want to correct that? 
    if df_minimum < 0:
        print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
        print("Would you like to correct that by rescaling all your features to a range of 0 to 1?")
        answer = str(input("Enter 1 for yes and 0 for no: "))
        print(" ")
    
    if answer == "1":
        print("Okay")  
        print("We are rescaling you dataframe....")
        scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

        # Compute summary statistics and generate MinMaxScalerModel
        scalerModel = scaler.fit(output)

        # rescale each feature to range [min, max].
        scaled_data = scalerModel.transform(output)
        final_data = scaled_data.select('label','scaledFeatures')
        final_data = final_data.withColumnRenamed('scaledFeatures','features')
        print("Done!")

    elif answer == "0":
        print("Okay") 
        print("We will return the dataframe unscaled.")
        final_data = output
    else:
        print("You have entered an invalid response.")
        print("We will return the data unscaled")
        final_data = output
    
    return final_data

**Take it for a test run!**

In [7]:
# Read in functions we will need
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import as f
from pyspark.ml.feature import StringIndexer


input_columns = df.columns
input_columns = input_columns[1:-1] # keep only relevant columns: everything but the first and last cols
dependent_var = 'class'

final_data = MLClassifierDFPrep(df,input_columns,dependent_var)
final_data.limit(5).toPandas()
# final_data.show(1,False)

NameError: name 'f' is not defined

**Split into Test and Training datasets**

In [None]:
train,test = final_data.randomSplit([0.7,0.3])

## Create all encompassing Classification Training and Evaluation Function

Let's use our handy dandy function to train and test all our classifiers we have available to us!

For more info on available hyper parameters visit: https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.classification

In [None]:
def ClassTrainEval(classifier,features,classes):

    def FindMtype(classifier):
        # Intstantiate Model
        M = classifier
        # Learn what it is
        Mtype = type(M).__name__
        
        return Mtype
    
    Mtype = FindMtype(classifier)
    

    def IntanceFitModel(Mtype,classifier,classes,features):
        
        if Mtype == "OneVsRest":
            # instantiate the base classifier.
            lr = LogisticRegression()
            # instantiate the One Vs Rest Classifier.
            OVRclassifier = OneVsRest(classifier=lr)
#             fitModel = OVRclassifier.fit(train)
            # Add parameters of your choice here:
            paramGrid = ParamGridBuilder() \
                .addGrid(lr.regParam, [0.1, 0.01]) \
                .build()
            #Cross Validator requires the following parameters:
            crossval = CrossValidator(estimator=OVRclassifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
        if Mtype == "MultilayerPerceptronClassifier":
            # specify layers for the neural network:
            # input layer of size features, two intermediate of features+1 and same size as features
            # and output of size number of classes
            # Note: crossvalidator cannot be used here
            features_count = len(features[0][0])
            layers = [features_count, features_count+1, features_count, classes]
            MPC_classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
            fitModel = MPC_classifier.fit(train)
            return fitModel
        if Mtype in("LinearSVC","GBTClassifier") and classes != 2: #These classifiers currently only accept binary classification
            print(Mtype," could not be used because PySpark currently only accepts binary classification data for this algorithm")
            return
        if Mtype in("LogisticRegression","NaiveBayes","RandomForestClassifier","GBTClassifier","LinearSVC","DecisionTreeClassifier"):
  
            # Add parameters of your choice here:
            if Mtype in("LogisticRegression"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .addGrid(classifier.maxIter, [10, 15,20,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("NaiveBayes"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifer.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]) \
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("RandomForestClassifier"):
                paramGrid = (ParamGridBuilder() \
                               .addGrid(classifier.maxDepth, [2, 5, 10])
                               .addGrid(classifier.maxBins, [5, 10, 20])
                               .addGrid(classifier.numTrees, [5, 20, 50])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("GBTClassifier"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .addGrid(classifier.maxIter, [10, 15,50,100])
                             .build())
                
            # Add parameters of your choice here:
            if Mtype in("LinearSVC"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxIter, [10, 15]) \
                             .addGrid(classifier.regParam, [0.1, 0.01]) \
                             .build())
            
            # Add parameters of your choice here:
            if Mtype in("DecisionTreeClassifier"):
                paramGrid = (ParamGridBuilder() \
                             .addGrid(classifier.maxDepth, [2, 5, 10, 20, 30]) \
                             .addGrid(classifier.maxBins, [10, 20, 40, 80, 100]) \
                             .build())
            
            #Cross Validator requires all of the following parameters:
            crossval = CrossValidator(estimator=classifier,
                                      estimatorParamMaps=paramGrid,
                                      evaluator=MulticlassClassificationEvaluator(),
                                      numFolds=2) # 3 is best practice
            # Fit Model: Run cross-validation, and choose the best set of parameters.
            fitModel = crossval.fit(train)
            return fitModel
    
    fitModel = IntanceFitModel(Mtype,classifier,classes,features)
    
    # Set the column names to match the external results dataframe that we will join with later:
    columns = ['Classifier', 'Result']
    
    if Mtype in("LinearSVC","GBTClassifier") and classes != 2:
        Mtype = [Mtype] # make this a string
        score = ["N/A"]
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
    else:
        predictions = fitModel.transform(test)
        MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
        accuracy = (MC_evaluator.evaluate(predictions))*100
        Mtype = [Mtype] # make this a string
        score = [str(accuracy)] #make this a string and convert to a list
        result = spark.createDataFrame(zip(Mtype,score), schema=columns)
        result = result.withColumn('Result',result.Result.substr(0, 5))
        
    return result

In [None]:
# Run!
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql import functions as F
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Comment out Naive Bayes if your data still contains negative values
classifiers = [
                LogisticRegression()
               ,OneVsRest()
               ,LinearSVC()
               ,NaiveBayes()
               ,RandomForestClassifier()
               ,GBTClassifier()
               ,DecisionTreeClassifier()
               ,MultilayerPerceptronClassifier()
              ] 

features = final_data.select(['features']).collect()
# Learn how many classes there are in order to specify evaluation type based on binary or multi and turn the df into an object
class_count = final_data.select(F.countDistinct("label")).collect()
classes = class_count[0][0]

#set up your results table
columns = ['Classifier', 'Result']
vals = [("Place Holder","N/A")]
results = spark.createDataFrame(vals, columns)

for classifier in classifiers:
    new_result = ClassTrainEval(classifier,features,classes)
    results = results.union(new_result)
results = results.where("Classifier!='Place Holder'")
results.show(100,False)

## Let's a try a bit of bagging!

The accuracy rates were pretty poor our first go around. Let's see if we can try to improve the accuracy of our model using a kind of bagging method (ie. just replicate our dataframe x times. 

In [None]:
# Dependent on running the above cell 
replicated_final_data = final_data

#set up your results table
columns = ['Classifier', 'Result']
vals = [("Place Holder","N/A")]
results2 = spark.createDataFrame(vals, columns)

for y in range(0,4):
    replicated_final_data = replicated_final_data.union(final_data)
    print(replicated_final_data.count()) #for testing
    train,test = replicated_final_data.randomSplit([0.7,0.3])
    for classifier in classifiers:
        new_result = ClassTrainEval(classifier,features,classes)
        results2 = results2.union(new_result)
    results2 = results2.where("Classifier!='Place Holder'")
    results = results.join(results2, ["Classifier"],"inner")

results.show(100,False)

### Classification Diagnostics

You can also generate some more detailed diagnostics on individual classifiers using this function too if you want. The output is pretty extensive, so I wouldn't do more than one at a time if I were you. 

In [None]:
from pyspark.ml.evaluation import *
from pyspark.ml.classification import *

def ClassDiag(classifier):
    
    # Fit our model
    C = classifier
    fitModel = C.fit(train)

    # Load the Summary
    trainingSummary = fitModel.summary

    # General Describe
    trainingSummary.predictions.describe().show()

    # View Predictions
    pred_and_labels = fitModel.evaluate(test)
    pred_and_labels.predictions.show()

    # Print the coefficients and intercept for multinomial logistic regression
    print("Coefficients: \n" + str(fitModel.coefficientMatrix))
    print(" ")
    print("Intercept: " + str(fitModel.interceptVector))
    print(" ")

    # Obtain the objective per iteration
    objectiveHistory = trainingSummary.objectiveHistory
    print(" ")
    print("objectiveHistory:")
    for objective in objectiveHistory:
        print(objective)

    # for multiclass, we can inspect metrics on a per-label basis
    print(" ")
    print("False positive rate by label:")
    for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
        print("label %d: %s" % (i, rate))

    print(" ")
    print("True positive rate by label:")
    for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
        print("label %d: %s" % (i, rate))

    print(" ")
    print("Precision by label:")
    for i, prec in enumerate(trainingSummary.precisionByLabel):
        print("label %d: %s" % (i, prec))

    print(" ")
    print("Recall by label:")
    for i, rec in enumerate(trainingSummary.recallByLabel):
        print("label %d: %s" % (i, rec))

    print(" ")
    print("F-measure by label:")
    for i, f in enumerate(trainingSummary.fMeasureByLabel()):
        print("label %d: %s" % (i, f))

    accuracy = trainingSummary.accuracy
    falsePositiveRate = trainingSummary.weightedFalsePositiveRate
    truePositiveRate = trainingSummary.weightedTruePositiveRate
    fMeasure = trainingSummary.weightedFMeasure()
    precision = trainingSummary.weightedPrecision
    recall = trainingSummary.weightedRecall
    print(" ")
    print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
          % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

In [None]:
# classifier = LogisticRegression()
ClassDiag(LogisticRegression())