<a href="https://colab.research.google.com/github/AmiraAmr99/PySpark/blob/main/Amira_Amr_Classification_in_PySparks_MLlib_Project_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Classification in PySpark's MLlib Project Solution

### Genre classification
Now it's time to leverage what we learned in the lectures to a REAL classification project! Have you ever wondered what makes us, humans, able to tell apart two songs of different genres? How we do we inherenly know the difference between a pop song and heavy metal? This type of classifcation may seem easy for us, but it's a very difficult challenge for a computer to do. So the question is, could an automatic genre classifcation model be possible? 

For this project we will be classifying songs based on a number of characteristics into a set of 23 electronic genres. This technology could be used by an application like Pandora to recommend songs to users or just create meaningful channels. Super fun!

### Dataset
*beatsdataset.csv*
Each row is an electronic music song. The dataset contains 100 song for each genre among 23 electronic music genres, they were the top (100) songs of their genres on November 2016. The 71 columns are audio features extracted of a two random minutes sample of the file audio. These features have been extracted using pyAudioAnalysis (https://github.com/tyiannak/pyAudioAnalysis).

### Your task
Create an algorithm that classifies songs into the 23 genres provided. Test out several different models and select the highest performing one. Also play around with feature selection methods and finally try to make a recommendation to a user.  

For the feature selection aspect of this project, you may need to get a bit creative if you want to select features from a non-tree algorithm. I did not go over this aspect of PySpark intentionally in the previous lectures to give you chance to get used to researching the PySpark documentation page. Here is the link to the Feature Selectors section of the documentation that just might come in handy: https://spark.apache.org/docs/latest/ml-features.html#feature-selectors

Good luck! Have fun :)

### Source
https://www.kaggle.com/caparrini/beatsdataset

In [None]:
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [None]:
# Install pyspark
!pip install pyspark
# import findspark
# findspark.init()
import os
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("SparkML").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m13.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=9912ad7d7077cc0b11df43d4b2367d0f3902b437dbf8b6b9ebc19746c0c84c5c
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
path = '/content/drive/MyDrive/Courses/Spark/pysparkML/Datasets/'
beats = spark.read.csv(path+'beatsdataset.csv',inferSchema=True,header=True)

In [None]:
beats.limit(2).toPandas()

Unnamed: 0,_c0,1-ZCRm,2-Energym,3-EnergyEntropym,4-SpectralCentroidm,5-SpectralSpreadm,6-SpectralEntropym,7-SpectralFluxm,8-SpectralRolloffm,9-MFCCs1m,...,63-ChromaVector8std,64-ChromaVector9std,65-ChromaVector10std,66-ChromaVector11std,67-ChromaVector12std,68-ChromaDeviationstd,69-BPM,70-BPMconf,71-BPMessentia,class
0,0,0.13644,0.088861,3.201201,0.262825,0.249212,1.114423,0.007003,0.256682,-22.723259,...,0.003431,0.004981,0.010818,0.024001,0.005201,0.015056,133.333333,0.132792,128.0,BigRoom
1,1,0.117039,0.108389,3.194001,0.247657,0.250288,1.065668,0.005387,0.199821,-21.775871,...,0.004461,0.006441,0.007469,0.015499,0.005589,0.019339,120.0,0.112767,126.0,BigRoom


In [None]:
beats.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- 1-ZCRm: double (nullable = true)
 |-- 2-Energym: double (nullable = true)
 |-- 3-EnergyEntropym: double (nullable = true)
 |-- 4-SpectralCentroidm: double (nullable = true)
 |-- 5-SpectralSpreadm: double (nullable = true)
 |-- 6-SpectralEntropym: double (nullable = true)
 |-- 7-SpectralFluxm: double (nullable = true)
 |-- 8-SpectralRolloffm: double (nullable = true)
 |-- 9-MFCCs1m: double (nullable = true)
 |-- 10-MFCCs2m: double (nullable = true)
 |-- 11-MFCCs3m: double (nullable = true)
 |-- 12-MFCCs4m: double (nullable = true)
 |-- 13-MFCCs5m: double (nullable = true)
 |-- 14-MFCCs6m: double (nullable = true)
 |-- 15-MFCCs7m: double (nullable = true)
 |-- 16-MFCCs8m: double (nullable = true)
 |-- 17-MFCCs9m: double (nullable = true)
 |-- 18-MFCCs10m: double (nullable = true)
 |-- 19-MFCCs11m: double (nullable = true)
 |-- 20-MFCCs12m: double (nullable = true)
 |-- 21-MFCCs13m: double (nullable = true)
 |-- 22-ChromaVector1m: double (null

In [None]:
#count of observation per each class (genre)
beats.groupBy('class').count().show()

+--------------------+-----+
|               class|count|
+--------------------+-----+
|           PsyTrance|  100|
|           HardDance|  100|
|              Breaks|  100|
|  HardcoreHardTechno|  100|
|   IndieDanceNuDisco|  100|
|              Trance|  100|
|           DeepHouse|  100|
|ElectronicaDowntempo|  100|
|           ReggaeDub|  100|
|             Minimal|  100|
|         DrumAndBass|  100|
|             Dubstep|  100|
|             BigRoom|  100|
|              Techno|  100|
|               House|  100|
|         FutureHouse|  100|
|        ElectroHouse|  100|
|           GlitchHop|  100|
|           TechHouse|  100|
|              HipHop|  100|
+--------------------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer

beats = beats.withColumn("class", beats['class'].cast(StringType())) 
str_indxr = StringIndexer(inputCol="class",outputCol='label')
indexed = str_indxr.fit(beats).transform(beats)
# indexed.groupBy('label').count().show()

In [None]:
# beats.printSchema()

In [None]:
# Data Prep function
def MLClassifierDFPrep(df,input_columns,dependent_var,treat_outliers=True,treat_neg_values=True):
    
    # change label (class variable) to string type to prep for reindexing
    # Pyspark is expecting a zero indexed integer for the label column. 
    # Just incase our data is not in that format... we will treat it by using the StringIndexer built in method
    renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) #Rename and change to string type
    indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 
    indexed = indexer.fit(renamed).transform(renamed)

    # Convert all string type data in the input column list to numeric
    # Otherwise the Algorithm will not be able to process it
    numeric_inputs = []
    string_inputs = []
    for column in input_columns:
        if str(indexed.schema[column].dataType) == 'StringType':
            indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
            indexed = indexer.fit(indexed).transform(indexed)
            new_col_name = column+"_num"
            string_inputs.append(new_col_name)
        else:
            numeric_inputs.append(column)
            
    if treat_outliers == True:
        print("We are correcting for non normality now!")
        # empty dictionary d
        d = {}
        # Create a dictionary of quantiles
        for col in numeric_inputs: 
            d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number
        #Now fill in the values
        for col in numeric_inputs:
            skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
            skew = skew[0][0]
            # This function will floor, cap and then log+1 (just in case there are 0 values)
            if skew > 1:
                indexed = indexed.withColumn(col, \
                log(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] ) +1).alias(col))
                print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
            elif skew < -1:
                indexed = indexed.withColumn(col, \
                exp(when(df[col] < d[col][0],d[col][0])\
                .when(indexed[col] > d[col][1], d[col][1])\
                .otherwise(indexed[col] )).alias(col))
                print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

            
    # Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
    # Note: we only need to check the numeric input values since anything that is indexed won't have negative values
    minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) # Calculate the mins for all columns in the df
    min_array = minimums.select(array(numeric_inputs).alias("mins")) # Create an array for all mins and select only the input cols
    df_minimum = min_array.select(array_min(min_array.mins)).collect() # Collect golobal min as Python object
    df_minimum = df_minimum[0][0] # Slice to get the number itself

    features_list = numeric_inputs + string_inputs
    assembler = VectorAssembler(inputCols=features_list,outputCol='features')
    output = assembler.transform(indexed).select('features','label')

#     final_data = output.select('features','label') #drop everything else
    
    # Now check for negative values and ask user if they want to correct that? 
    if df_minimum < 0:
        print(" ")
        print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
        print(" ")
    
    if treat_neg_values == True:
        print("You have opted to correct that by rescaling all your features to a range of 0 to 1")
        print(" ")
        print("We are rescaling you dataframe....")
        scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

        # Compute summary statistics and generate MinMaxScalerModel
        scalerModel = scaler.fit(output)

        # rescale each feature to range [min, max].
        scaled_data = scalerModel.transform(output)
        final_data = scaled_data.select('label','scaledFeatures')
        final_data = final_data.withColumnRenamed('scaledFeatures','features')
        print("Done!")

    else:
        print("You have opted not to correct that therefore you will not be able to use to Naive Bayes classifier")
        print("We will return the dataframe unscaled.")
        final_data = output
    
    return final_data

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

beats = beats.drop('_c0') # drop first column 
input_columns = beats.columns[1:-1]
dependent_var = 'class'

final_data = MLClassifierDFPrep(beats,input_columns,dependent_var)
final_data.limit(5).toPandas()

We are correcting for non normality now!
7-SpectralFluxm has been treated for positive (right) skewness. (skew =) 1.6396138160129063 )
22-ChromaVector1m has been treated for positive (right) skewness. (skew =) 2.4162415204309258 )
23-ChromaVector2m has been treated for positive (right) skewness. (skew =) 4.154796693680583 )
24-ChromaVector3m has been treated for positive (right) skewness. (skew =) 1.1974019617504328 )
25-ChromaVector4m has been treated for positive (right) skewness. (skew =) 2.446635863594906 )
26-ChromaVector5m has been treated for positive (right) skewness. (skew =) 2.154482876187508 )
27-ChromaVector6m has been treated for positive (right) skewness. (skew =) 2.01234064472543 )
28-ChromaVector7m has been treated for positive (right) skewness. (skew =) 1.1829228989215521 )
29-ChromaVector8m has been treated for positive (right) skewness. (skew =) 3.7372643733999955 )
30-ChromaVector9m has been treated for positive (right) skewness. (skew =) 2.4117416421548645 )
31-Chr

Unnamed: 0,label,features
0,0.0,"[0.30338998025826874, 0.8957733872366486, 0.61..."
1,0.0,"[0.37399300906442695, 0.8816024910399278, 0.56..."
2,0.0,"[0.44679648775095127, 0.7434979440255818, 0.46..."
3,0.0,"[0.5860529526571694, 0.7966282209370411, 0.518..."
4,0.0,"[0.5186704525553854, 0.8825805153845844, 0.725..."


# Features Selection

In [None]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

slicer = VectorSlicer(inputCol="features", outputCol="selectedfeatures", indices=[1])

output = slicer.transform(final_data)

output.select("features","selectedfeatures").show()

+--------------------+--------------------+
|            features|    selectedfeatures|
+--------------------+--------------------+
|[0.30338998025826...|[0.8957733872366486]|
|[0.37399300906442...|[0.8816024910399278]|
|[0.44679648775095...|[0.7434979440255818]|
|[0.58605295265716...|[0.7966282209370411]|
|[0.51867045255538...|[0.8825805153845844]|
|[0.53704862759740...|[0.9366879335151957]|
|[0.36610043340353...|[0.8301498598791096]|
|[0.28833375902182...|[0.7913885661814694]|
|[0.51348937539023...|[0.8596386312593476]|
|[0.44188100301752...|[0.7311306494555135]|
|[0.56945373664701...|[0.9640035330681084]|
|[0.37538743054984...|[0.9590443978826131]|
|[0.59208221389722...|[0.7820484079296268]|
|[0.49602450451477...| [0.864391463449406]|
|[0.44102406047498...|[0.8260906140727508]|
|[0.30292002346477...|[0.8267150246079552]|
|[0.40585185504418...|[0.8113030840888685]|
|[0.42819003402878...|[0.8293505384319086]|
|[0.36745630460104...|[0.8428873788795865]|
|[0.47253668468282...|[0.8336434

# Test and Train split

In [None]:
train,test = output.randomSplit([0.7,0.3])

# Model Training 

In [None]:
from pyspark.ml.classification import *
classifier = LogisticRegression()
fitModel = classifier.fit(train)

In [None]:
from pyspark.ml.evaluation import *

predictionAndLabels = fitModel.transform(test)
# predictionAndLabels = predictionAndLabels.predictions.select('label','prediction')
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction') #labelCol='label'
auc = Bin_evaluator.evaluate(predictionAndLabels)
print("AUC:",auc)

predictions = fitModel.transform(test)
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction",
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy: {0:.2f}".format(accuracy),"%") #     print("Test Error = %g " % (1.0 - accuracy))
print(" ")

AUC: 0.6631138975966562
Accuracy: 40.78 %
 


In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# First tell Spark which classifier you want to use
classifier = LogisticRegression() # if label column nor nemed as 'label', Must clarify labelCol='otherName'

# Then Set up your parameter grid for the cross validator to conduct hyperparameter tuning
paramGrid = (ParamGridBuilder().addGrid(classifier.maxIter, [10, 15,20]).build())

# Then set up the Cross Validator which requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MC_evaluator,
                          numFolds=2) # 3 + is best practice

# Then fit your model
fitModel = crossval.fit(train)

# Collect the best model and
# print the coefficient matrix
# These values should be compared relative to eachother
# And intercepts can be prepared to other models
BestModel = fitModel.bestModel
print("Intercept: " + str(BestModel.interceptVector))
print("Coefficients: \n" + str(BestModel.coefficientMatrix))

# You can extract the best model from this run like this if you want
LR_BestModel = BestModel

# Next you need to generate predictions on the test dataset
# fitModel automatically uses the best model 
# so we don't need to use BestModel here
predictions = fitModel.transform(test)

# Now print the accuracy rate of the model or AUC for a binary classifier
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

Intercept: [-5.8246097394931144,2.143038716420355,-9.851664388302217,6.575159204983032,-1.9846042934564279,-5.541668848322515,-5.002297766169326,19.027938590111148,-6.91367397192708,-6.513653178418401,-2.626919398304925,-3.2232572188289836,19.30872102398721,-0.2260748703482572,-9.636478700694278,-2.7483136523041978,5.446921194125121,3.6818488425556595,4.348708896400585,-3.8332345908257466,1.8841117740095963,2.9434836330775997,-1.4334812582748337]
Coefficients: 
DenseMatrix([[ 4.31765631,  2.15559297,  0.94167009, ..., -5.39701384,
              -1.30066709,  3.43281318],
             [ 0.45787351,  2.25012623,  0.98992156, ...,  8.20584453,
              -8.67235361,  4.95465408],
             [-0.51917709,  5.38938598, -0.68156953, ..., -3.96097015,
               1.67525223, -2.85286866],
             ...,
             [-1.63015071, -0.87202127,  0.24679571, ..., -4.93867331,
               1.07628384,  2.32934222],
             [-1.67841431, -1.49300649,  0.08354119, ..., -3.6498708

* Multilayer precepron classifier gave the best accuracy

# Multilayer Perceptron Classifier

In [None]:
# Count how many features you have
features = final_data.select(['features']).limit(1).collect()
features_count = len(features[0][0])
# Count how many classes you have 
class_count = final_data.select(countDistinct("label")).limit(1).collect()
classes = class_count[0][0]

In [None]:
print(features_count)
print(classes)

70
23


In [None]:
from pyspark.ml.evaluation import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

layers = [features_count, features_count+1, features_count, classes]
# Instaniate the classifier
classifier = MultilayerPerceptronClassifier(maxIter=200, layers=layers, blockSize=128, seed=1234)

# Fit the model
fitModel = classifier.fit(train)

# Print the model Weights
print('\033[1m' + "Model Weights: "+ '\033[0m',fitModel.weights.size)
   
# Generate predictions on test dataframe
predictions = fitModel.transform(test)
# Calculate accuracy score
accuracy = (MC_evaluator.evaluate(predictions))*100
# Print accuracy score
print("Accuracy: ",accuracy)

[1mModel Weights: [0m 11714
Accuracy:  45.42815674891146
