# Classification in PySpark's MLlib Project Solution

### Genre classification
Now it's time to leverage what we learned in the lectures to a REAL classification project! Have you ever wondered what makes us, humans, able to tell apart two songs of different genres? How we do we inherenly know the difference between a pop song and heavy metal? This type of classifcation may seem easy for us, but it's a very difficult challenge for a computer to do. So the question is, could an automatic genre classifcation model be possible? 

For this project we will be classifying songs based on a number of characteristics into a set of 23 electronic genres. This technology could be used by an application like Pandora to recommend songs to users or just create meaningful channels. Super fun!

### Dataset
*beatsdataset.csv*
Each row is an electronic music song. The dataset contains 100 song for each genre among 23 electronic music genres, they were the top (100) songs of their genres on November 2016. The 71 columns are audio features extracted of a two random minutes sample of the file audio. These features have been extracted using pyAudioAnalysis (https://github.com/tyiannak/pyAudioAnalysis).

### Your task
Create an algorithm that classifies songs into the 23 genres provided. Test out several different models and select the highest performing one. Also play around with feature selection methods and finally try to make a recommendation to a user.  

For the feature selection aspect of this project, you may need to get a bit creative if you want to select features from a non-tree algorithm. I did not go over this aspect of PySpark intentionally in the previous lectures to give you chance to get used to researching the PySpark documentation page. Here is the link to the Feature Selectors section of the documentation that just might come in handy: https://spark.apache.org/docs/latest/ml-features.html#feature-selectors

Good luck! Have fun :)

### Source
https://www.kaggle.com/caparrini/beatsdataset

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('ClassificationProject').getOrCreate()



In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

In [3]:
path ="Datasets/"
df = spark.read.csv(path+'beatsdataset.csv',inferSchema=True,header=True)

In [4]:
df.limit(5).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,63-ChromaVector8std,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.003431,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.004461,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom
2,2,0.085308,0.128525,3.123837,0.217205,0.228652,0.789647,0.008247,0.156822,-22.472722,...,0.001529,0.004556,0.007723,0.017482,0.002901,0.022201,133.333333,0.123373,129.0,BigRoom
3,3,0.10305,0.167042,3.15083,0.233593,0.245032,0.967082,0.006571,0.168083,-21.470751,...,0.001591,0.003514,0.009477,0.023162,0.004165,0.015379,133.333333,0.158876,129.0,BigRoom
4,4,0.15173,0.148405,3.194498,0.29373,0.267231,1.353005,0.003872,0.292055,-21.371157,...,0.003945,0.004131,0.01133,0.028188,0.002639,0.019079,133.333333,0.190708,129.0,BigRoom


In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 1-ZCRm: double (nullable = true)
 |-- 2-Energym: double (nullable = true)
 |-- 3-EnergyEntropym: double (nullable = true)
 |-- 4-SpectralCentroidm: double (nullable = true)
 |-- 5-SpectralSpreadm: double (nullable = true)
 |-- 6-SpectralEntropym: double (nullable = true)
 |-- 7-SpectralFluxm: double (nullable = true)
 |-- 8-SpectralRolloffm: double (nullable = true)
 |-- 9-MFCCs1m: double (nullable = true)
 |-- 10-MFCCs2m: double (nullable = true)
 |-- 11-MFCCs3m: double (nullable = true)
 |-- 12-MFCCs4m: double (nullable = true)
 |-- 13-MFCCs5m: double (nullable = true)
 |-- 14-MFCCs6m: double (nullable = true)
 |-- 15-MFCCs7m: double (nullable = true)
 |-- 16-MFCCs8m: double (nullable = true)
 |-- 17-MFCCs9m: double (nullable = true)
 |-- 18-MFCCs10m: double (nullable = true)
 |-- 19-MFCCs11m: double (nullable = true)
 |-- 20-MFCCs12m: double (nullable = true)
 |-- 21-MFCCs13m: double (nullable = true)
 |-- 22-ChromaVector1m: double (null

In [6]:
df.groupBy('class').count().show(100)

+--------------------+-----+
|               class|count|
+--------------------+-----+
|           PsyTrance|  100|
|           HardDance|  100|
|              Breaks|  100|
|  HardcoreHardTechno|  100|
|   IndieDanceNuDisco|  100|
|              Trance|  100|
|           DeepHouse|  100|
|ElectronicaDowntempo|  100|
|           ReggaeDub|  100|
|             Minimal|  100|
|         DrumAndBass|  100|
|             Dubstep|  100|
|             BigRoom|  100|
|              Techno|  100|
|               House|  100|
|         FutureHouse|  100|
|        ElectroHouse|  100|
|           GlitchHop|  100|
|           TechHouse|  100|
|              HipHop|  100|
|           FunkRAndB|  100|
|               Dance|  100|
|    ProgressiveHouse|  100|
+--------------------+-----+



## Format Data

In [7]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---------+----------------+-------------------+-----------------+------------------+---------------+------------------+---------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+-------------------+---------+------------+-------------------+----------------------+--------------------+---------------------+------------------+---------------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-

In [8]:
input_columns = df.columns # Collect the column names as a list
input_columns = input_columns[1:-1] # keep only relevant columns: from column 1 to 

dependent_var = 'class'

In [9]:
renamed = df.withColumn('label_str', df[dependent_var].cast(StringType()))
indexer = StringIndexer(inputCol='label_str', outputCol='label')
indexed = indexer.fit(renamed).transform(renamed)

In [10]:
numeric_inputs = []
string_inputs = []
for column in input_columns:
    if str(indexed.schema[column].dataType) == 'StringType':
        indexer = StringIndexer(inputCol = column, outputCol=column+'_num')
        indexed = indexer.fit(indexed).transform(indexed)
        new_col_name = column+'_num'
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
indexed.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 1-ZCRm: double (nullable = true)
 |-- 2-Energym: double (nullable = true)
 |-- 3-EnergyEntropym: double (nullable = true)
 |-- 4-SpectralCentroidm: double (nullable = true)
 |-- 5-SpectralSpreadm: double (nullable = true)
 |-- 6-SpectralEntropym: double (nullable = true)
 |-- 7-SpectralFluxm: double (nullable = true)
 |-- 8-SpectralRolloffm: double (nullable = true)
 |-- 9-MFCCs1m: double (nullable = true)
 |-- 10-MFCCs2m: double (nullable = true)
 |-- 11-MFCCs3m: double (nullable = true)
 |-- 12-MFCCs4m: double (nullable = true)
 |-- 13-MFCCs5m: double (nullable = true)
 |-- 14-MFCCs6m: double (nullable = true)
 |-- 15-MFCCs7m: double (nullable = true)
 |-- 16-MFCCs8m: double (nullable = true)
 |-- 17-MFCCs9m: double (nullable = true)
 |-- 18-MFCCs10m: double (nullable = true)
 |-- 19-MFCCs11m: double (nullable = true)
 |-- 20-MFCCs12m: double (nullable = true)
 |-- 21-MFCCs13m: double (nullable = true)
 |-- 22-ChromaVector1m: double (null

#### Skewness

In [11]:
d= {}
for col in numeric_inputs:
    d[col]=indexed.approxQuantile(col,[0.01, 0.99], 0.25)

for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect()
    skew=skew[0][0]
    if skew > 1:
        indexed = indexed.withColumn(col, \
                                    log(when(df[col] < d[col][0], d[col][0])\
                                       .when(indexed[col]> d[col][1], d[col][1])\
                                       .otherwise(indexed[col])+1).alias(col))
        print(col, " has been treated for pos skew", skew)
    elif skew < -1:
        indexed = indexed.withColumn(col, \
                                    exp(when(df[col] < d[col][0], d[col][0])\
                                       .when(indexed[col]> d[col][1], d[col][1])\
                                       .otherwise(indexed[col])+1).alias(col))
        print(col, " has been treated for neg skew", skew)

7-SpectralFluxm  has been treated for pos skew 1.6396138160129063
22-ChromaVector1m  has been treated for pos skew 2.4162415204309258
23-ChromaVector2m  has been treated for pos skew 4.154796693680583
24-ChromaVector3m  has been treated for pos skew 1.1974019617504328
25-ChromaVector4m  has been treated for pos skew 2.446635863594906
26-ChromaVector5m  has been treated for pos skew 2.154482876187508
27-ChromaVector6m  has been treated for pos skew 2.01234064472543
28-ChromaVector7m  has been treated for pos skew 1.1829228989215521
29-ChromaVector8m  has been treated for pos skew 3.7372643733999955
30-ChromaVector9m  has been treated for pos skew 2.4117416421548645
31-ChromaVector10m  has been treated for pos skew 2.1979538518563233
32-ChromaVector11m  has been treated for pos skew 2.1924295373960554
33-ChromaVector12m  has been treated for pos skew 2.278981912155668
41-SpectralFluxstd  has been treated for pos skew 1.8577721462401056
56-ChromaVector1std  has been treated for pos skew 1

In [12]:
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs])
min_array = minimums.select(array(numeric_inputs).alias('mins'))
df_minimum = min_array.select(array_min(min_array.mins)).collect()
df_minimum = df_minimum[0][0]
df_minimum

-30.3789543716

In [13]:
features_list = numeric_inputs + string_inputs
assembler = VectorAssembler(inputCols=features_list, outputCol='features')
output = assembler.transform(indexed).select('features','label')


In [14]:
output.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.136439587512,0...|  0.0|
|[0.117038518483,0...|  0.0|
|[0.0853077737447,...|  0.0|
|[0.103049917216,0...|  0.0|
|[0.151729948738,0...|  0.0|
|[0.127046737192,0...|  0.0|
|[0.123395302003,0...|  0.0|
|[0.140027382431,0...|  0.0|
|[0.117635200751,0...|  0.0|
|[0.137400181488,0...|  0.0|
|[0.148838734199,0...|  0.0|
|[0.119846749928,0...|  0.0|
|[0.0786813648231,...|  0.0|
|[0.138335144235,0...|  0.0|
|[0.101304207661,0...|  0.0|
|[0.132862180406,0...|  0.0|
|[0.1533035231,0.1...|  0.0|
|[0.118001599962,0...|  0.0|
|[0.110992493712,0...|  0.0|
|[0.124299777916,0...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [15]:
scaler = MinMaxScaler(inputCol='features', outputCol='scaledFeatures', min = 0, max=1000)
scalerModel = scaler.fit(output)
scaled_data= scalerModel.transform(output)

In [16]:
scaled_data.show()

+--------------------+-----+--------------------+
|            features|label|      scaledFeatures|
+--------------------+-----+--------------------+
|[0.136439587512,0...|  0.0|[519.818266700239...|
|[0.117038518483,0...|  0.0|[435.295463992565...|
|[0.0853077737447,...|  0.0|[297.057129121742...|
|[0.103049917216,0...|  0.0|[374.352647734368...|
|[0.151729948738,0...|  0.0|[586.432337466259...|
|[0.127046737192,0...|  0.0|[478.897323979332...|
|[0.123395302003,0...|  0.0|[462.989461602348...|
|[0.140027382431,0...|  0.0|[535.448873531282...|
|[0.117635200751,0...|  0.0|[437.894973202184...|
|[0.137400181488,0...|  0.0|[524.003195633498...|
|[0.148838734199,0...|  0.0|[573.836456502165...|
|[0.119846749928,0...|  0.0|[447.529820358620...|
|[0.0786813648231,...|  0.0|[268.188480022892...|
|[0.138335144235,0...|  0.0|[528.076459415282...|
|[0.101304207661,0...|  0.0|[366.747280005321...|
|[0.132862180406,0...|  0.0|[504.232915471732...|
|[0.1533035231,0.1...|  0.0|[593.287780078539...|


In [17]:
final_data=scaled_data.select('label', 'scaledFeatures')
final_data = final_data.withColumnRenamed('scaledFeatures', 'features')

In [18]:
train, test= final_data.randomSplit([0.7, 0.3])

In [19]:
train.count()

1542

In [20]:
test.count()

758

### Modeling

In [22]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.functions import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [23]:
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')
MC_evaluator = MulticlassClassificationEvaluator(metricName='accuracy')

### Logit

In [24]:
classifier= LogisticRegression()

paramGrid = (ParamGridBuilder().addGrid(classifier.maxIter, [10, 15,20]).build())

crossval = CrossValidator(estimator=classifier,
                         estimatorParamMaps=paramGrid,
                         evaluator=MC_evaluator,
                         numFolds=2)

fitModel = crossval.fit(train)
BestModel = fitModel.bestModel

print('Intercept: ' +str(BestModel.interceptVector))
print('Coefficients: \n' + str(BestModel.coefficientMatrix))

LR_BestModel = BestModel

predictions=fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100

print(accuracy)


Intercept: [-0.03104983004448102,-0.038428531281883775,-0.022001324082627918,0.00709561457789455,0.029615151579420754,-0.02629458790663195,0.025654467757890802,-0.018218998451222105,-0.10066197359292689,0.01483491625043558,-0.10651904137627013,-0.05562154550470953,0.03145330183031093,6.808826114114891e-05,0.024225357003700203,-0.005416235348866503,0.04892804719922859,0.03279665219289679,0.0023942968736978144,0.0029301810273619842,0.12653773119838008,-0.054488162324710966,0.11216642416197156]
Coefficients: 
DenseMatrix([[ 5.53467255e-04,  2.34584211e-03,  2.13934111e-03, ...,
              -1.79073279e-03, -6.98152751e-04,  1.78029467e-03],
             [-3.19102280e-04, -2.60925678e-04, -1.46814209e-04, ...,
               4.47028155e-03, -3.40749700e-03,  2.54846963e-03],
             [-5.66652158e-04,  2.76863352e-04,  1.30480733e-03, ...,
              -3.35299419e-03,  3.84047476e-04, -1.61011355e-03],
             ...,
             [-5.97643001e-05, -8.66986553e-04, -2.34401688e-0

In [25]:

coeff_array = BestModel.coefficientMatrix.toArray()
coeff_scores = []
for x in coeff_array[0]:
    coeff_scores.append(float(x))

result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
result.show(100)

+--------------------+--------------------+
|             feature|               coeff|
+--------------------+--------------------+
|              1-ZCRm|5.534672548442998E-4|
|           2-Energym|0.002345842112034337|
|    3-EnergyEntropym|0.002139341110883...|
| 4-SpectralCentroidm|7.994319519025507E-4|
|   5-SpectralSpreadm|0.001391952828275...|
|  6-SpectralEntropym|0.001018927416613...|
|     7-SpectralFluxm|-0.00198236406404...|
|  8-SpectralRolloffm|3.924082428112009...|
|           9-MFCCs1m|0.003862276911599268|
|          10-MFCCs2m|-0.00161731856750...|
|          11-MFCCs3m|-3.37369240679970...|
|          12-MFCCs4m|-7.19782834790417...|
|          13-MFCCs5m|-4.72482412778606...|
|          14-MFCCs6m|-1.92152670244053...|
|          15-MFCCs7m|-2.09017086414245...|
|          16-MFCCs8m|-7.30984681635879...|
|          17-MFCCs9m|-7.81897691117023...|
|         18-MFCCs10m|0.001141793014895319|
|         19-MFCCs11m| 2.70643996825311E-4|
|         20-MFCCs12m|7.48061279

### One Vs Rest Classifier

In [26]:
lr = LogisticRegression()
classifier = OneVsRest(classifier = lr)

paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam,[0.1,0.01]) \
    .build()

crossval=CrossValidator(estimator=classifier,
                       estimatorParamMaps=paramGrid,
                       evaluator=MulticlassClassificationEvaluator(),
                       numFolds=2)

fitModel=crossval.fit(train)

BestModel = fitModel.bestModel

models= BestModel.models
for model in models:
    print('\033[1m' + 'Intercept: '+ '\033[0m',model.intercept,'\033[1m' + '\nCoefficients:'+ '\033[0m',model.coefficients)

predictions=fitModel.transform(test)
accuracy=(MC_evaluator.evaluate(predictions))*100
print(accuracy)

[1mIntercept: [0m -5.883843652842539 [1m
Coefficients:[0m [-8.873191421856737e-05,0.001578533880308014,0.0018473890016661566,-0.00021297003760045228,0.002117392347778086,0.000614602852798278,-0.0014825429713786794,-0.0005986979506783181,0.004221411526926817,-0.0011329618139343561,0.0008278956200117906,0.00047797643930793575,-8.260412464857321e-05,-0.00035922132428955426,0.00020568231914626185,-0.0006875910098533727,-0.0018690251449266302,0.0015708668154802861,-0.00016600446977813567,0.0006429138057868993,-0.001542245446465967,0.001219442312357794,-0.0026332883116698532,0.0009154958815145765,-0.0013379917283713016,-0.0001995669825103106,0.0004551743261514774,-0.0016589206878034198,-0.0023464294832659122,0.0002256935670247355,-4.584588859374557e-05,-0.00042313741744579107,0.00032606839288655033,-0.0018685632905306995,-0.0019584170965785026,0.0002615250050379769,-0.00032825905787346813,-0.0018977035110700172,-0.0010511746349675902,0.0002750299267046536,-0.0003339093131621355,-0.000748

46.701846965699204


### Multilayer Perceptron Classifier


In [27]:
features = final_data.select(['features']).collect()
features_count = len(features[0][0])
class_count=final_data.select(countDistinct('label')).collect()
classes = class_count[0][0]

layers = [features_count, features_count+1, features_count, classes]

classifier = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed= 1234)

fitModel = classifier.fit(train)

print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)


predictions = fitModel.transform(test)

accuracy = (MC_evaluator.evaluate(predictions))*100

print("Accuracy: ",accuracy)

[1mModel Weights: [0m 12023
Accuracy:  13.984168865435356


### NaiveBayes

In [None]:
classifier= NaiveBayes()
paramGrid = (ParamGridBuilder()\
             .addGrid(classifier.smoothing, [0.0,0.2,0.4,0.6])\
             .build())
crossval = CrossValidator(estimator=classifier,
                         estimatorParamMaps = paramGrid,
                         evaluator= MulticlassClassificationEvaluator(),
                         numFolds=2)

fitModel = crossval.fit(train)

predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy: ",accuracy)

In [None]:
aa