<a href="https://colab.research.google.com/github/econdatatech/AIML427/blob/main/AIML_427_Assignment_3_Individual_Part_PySpark_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Run below commands in google colab
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
# install findspark 
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [None]:
!pip3 uninstall -y pyspark
!pip3 install pyspark==3.0.2

Found existing installation: pyspark 3.0.2
Uninstalling pyspark-3.0.2:
  Successfully uninstalled pyspark-3.0.2
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.2
  Using cached pyspark-3.0.2-py2.py3-none-any.whl
Installing collected packages: pyspark
Successfully installed pyspark-3.0.2


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [None]:
import sys
from operator import add
import time
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.sql.functions import when, lit
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA
import pandas as pd


In [None]:
n_skip_rows = 3
row_rdd = spark.sparkContext.textFile('features.csv').zipWithIndex().filter(lambda row: row[1] >= n_skip_rows).map(lambda row: row[0])

cols = pd.read_csv('features.csv',nrows=5,header=[0,1,2])
cols.columns=['_'.join(col).strip() for col in cols.columns.values]
cols.rename(columns={"feature_statistics_number": 'track_id'},inplace=True)

features = spark.read.csv(row_rdd, header ='true',inferSchema='true').toDF(*cols.columns)

trackspd = pd.read_csv('tracks.csv',skiprows=[1,2],usecols=['Unnamed: 0','track.7']).rename(columns={'Unnamed: 0':'track_id',"track.7": "Genre"})
mySchema = StructType([StructField("track_id", IntegerType(), True),StructField("Genre", StringType(), True)])
tracks = spark.createDataFrame(trackspd,schema=mySchema)


## Below cell can download pre-wrangled data from github (as a fallback solution)

In [None]:
#Fall back in case the above doesn't work
#!wget -O mfa_wrangled.bz2 https://github.com/econdatatech/AIML427/blob/main/mfa_wrangled.bz2?raw=true
#data = spark.read.option("sep", ",").csv("mfa_wrangled.bz2",  header ='true',inferSchema='true')
#new_cols=(column.replace('.', '_') for column in data.columns)
#data = data.toDF(*new_cols)

In [None]:
data = features.join(tracks,['track_id'],how='inner')
#data.show()
#data.groupby(['Genre']).count().sort('count',ascending=False).show()
data=data.filter(data['Genre'].isin(['Rock','Experimental']))
data = data.withColumn('label', when(data.Genre=='Rock', lit('1')).otherwise('0'))
data=data.drop('Genre').drop('track_id')
#data.count()

In [None]:
assembler = VectorAssembler().setInputCols(data.columns[:-1]).setOutputCol('features')
data=assembler.transform(data)
labelIndexer = StringIndexer(inputCol='label', outputCol="indexedLabel").fit(data)
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [None]:
start = time.time()

seed=23
(trainingData, testData) = data.randomSplit([0.7, 0.3],seed)

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions_train = model.transform(trainingData)

predictions_test = model.transform(testData)

# Select example rows to display.
predictions_test.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions_test)

end = time.time()

columns = ['ValueType', 'Value']
vals = [
     ('Train accuracy', (accuracy_train)),
     ('Test accuracy', (accuracy_test)),
    ('Run time', (end-start)/60)

]
dfnor = spark.createDataFrame(vals, columns)


gbtModel = model.stages[2]
print(gbtModel)  # summary only


+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|[-1.5479236841,-1...|
|       0.0|         0.0|[-1.1817407608,0....|
|       1.0|         1.0|[-0.91913110018,-...|
|       1.0|         0.0|[-0.85557812452,-...|
|       0.0|         0.0|[-0.79573595524,-...|
+----------+------------+--------------------+
only showing top 5 rows

GBTClassificationModel: uid = GBTClassifier_c61a14101545, numTrees=20, numClasses=2, numFeatures=518


In [None]:
dfnor.show()

+--------------+------------------+
|     ValueType|             Value|
+--------------+------------------+
|Train accuracy|0.8683315070073245|
| Test accuracy|0.8304925513353912|
|      Run time|15.423114673296611|
+--------------+------------------+



In [None]:
#Standard Scale


start = time.time()

scaler = StandardScaler(withMean=True, withStd=True,inputCol="features", outputCol="normFeatures")
scaly = scaler.fit(trainingData)

data = scaly.transform(data)
featureIndexer =VectorIndexer(inputCol="normFeatures", outputCol="indexedFeatures", maxCategories=4).fit(data)

(trainingData, testData) = data.randomSplit([0.7, 0.3],seed)



# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions_train = model.transform(trainingData)

predictions_test = model.transform(testData)

# Select example rows to display.
predictions_test.select("prediction", "indexedLabel", "normFeatures").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions_test)

end = time.time()

columns = ['ValueType', 'Value']
vals = [
     ('Train accuracy', (accuracy_train)),
     ('Test accuracy', (accuracy_test)),
    ('Run time', (end-start)/60)

]
dfsc = spark.createDataFrame(vals, columns)


gbtModel = model.stages[2]
print(gbtModel)  # summary only

+----------+------------+--------------------+
|prediction|indexedLabel|        normFeatures|
+----------+------------+--------------------+
|       1.0|         1.0|[-0.8660555467494...|
|       0.0|         0.0|[-0.6990745307516...|
|       1.0|         1.0|[-0.5793233759299...|
|       1.0|         0.0|[-0.5503429408245...|
|       0.0|         0.0|[-0.5230546496726...|
+----------+------------+--------------------+
only showing top 5 rows

GBTClassificationModel: uid = GBTClassifier_2417f898e2ca, numTrees=20, numClasses=2, numFeatures=518


In [None]:
dfsc.show()

+--------------+------------------+
|     ValueType|             Value|
+--------------+------------------+
|Train accuracy|0.8668896706845839|
| Test accuracy|0.8255267749295396|
|      Run time| 17.32752676407496|
+--------------+------------------+



In [None]:
start = time.time()

pca = PCA(k=518,inputCol="normFeatures", outputCol="pca")
model = pca.fit(trainingData)

data = model.transform(data)


In [None]:
featureIndexer =VectorIndexer(inputCol="pca", outputCol="indexedFeatures", maxCategories=4).fit(data)

(trainingData, testData) = data.randomSplit([0.7, 0.3],seed)

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions_train = model.transform(trainingData)

predictions_test = model.transform(testData)

# Select example rows to display.
predictions_test.select("prediction", "indexedLabel", "pca").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(predictions_test)

end = time.time()

columns = ['ValueType', 'Value']
vals = [
     ('Train accuracy', (accuracy_train)),
     ('Test accuracy', (accuracy_test)),
    ('Run time', (end-start)/60)

]
dfpca = spark.createDataFrame(vals, columns)


gbtModel = model.stages[2]
print(gbtModel)  # summary only

+----------+------------+--------------------+
|prediction|indexedLabel|                 pca|
+----------+------------+--------------------+
|       1.0|         1.0|[8.35891430112388...|
|       0.0|         0.0|[2.18806224797266...|
|       1.0|         1.0|[5.93260669459259...|
|       0.0|         0.0|[6.32561157549543...|
|       0.0|         0.0|[-1.4067773142646...|
+----------+------------+--------------------+
only showing top 5 rows

GBTClassificationModel: uid = GBTClassifier_b08a96e27f33, numTrees=20, numClasses=2, numFeatures=518


In [None]:
dfpca.show()

+--------------+------------------+
|     ValueType|             Value|
+--------------+------------------+
|Train accuracy|0.8486648595651421|
| Test accuracy| 0.807408401556838|
|      Run time|  20.3068524201711|
+--------------+------------------+



In [None]:
data.count()

24790