In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf=pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc=pyspark.SparkContext(conf=conf)
spark=SparkSession(sc)


In [2]:
df = spark.read.csv("Merged dataset.csv",header=True,inferSchema=True).select("genre","lyrics")

In [3]:
#df = spark.read.csv("Mendeley dataset.csv",header=True,inferSchema=True).select("artist_name","track_name","release_date","genre","lyrics")

In [4]:
df=df.na.drop()

In [5]:
df.show(5)

+-----+--------------------+
|genre|              lyrics|
+-----+--------------------+
|  pop|hold time feel br...|
|  pop|believe drop rain...|
|  pop|sweetheart send l...|
|  pop|kiss lips want st...|
|  pop|till darling till...|
+-----+--------------------+
only showing top 5 rows



In [6]:
from pyspark.ml.feature import StopWordsRemover, CountVectorizer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import numpy as np
import pandas as ps


In [7]:
prepositions = ['aboard', 'about', 'above', 'across', 'after', 'against', 'along', 'amid', 'among', 'around', 'as', 'at', 'before', 'behind', 'below', 'beneath', 'beside', 'between', 'beyond', 'but', 'by', 'concerning', 'considering', 'despite', 'down', 'during', 'except', 'following', 'for', 'from', 'in', 'inside', 'into', 'like', 'minus', 'near', 'of', 'off', 'on', 'onto', 'opposite', 'outside', 'over', 'past', 'per', 'plus', 'regarding', 'round', 'save', 'since', 'through', 'to', 'toward', 'under', 'underneath', 'unlike', 'until', 'up', 'upon', 'versus', 'via', 'with', 'within', 'without']


In [8]:
stopwords = StopWordsRemover().getStopWords() + prepositions


In [9]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, StringIndexer, IndexToString
from pyspark.ml import Pipeline

# Define your feature extraction pipeline
tokenizer = RegexTokenizer(inputCol="lyrics", outputCol="words", pattern="\\W")
stopwords = StopWordsRemover(inputCol="words", outputCol="filtered")
prepositions = StopWordsRemover(inputCol="filtered", outputCol="clean", stopWords=prepositions)
vectorizer = HashingTF(inputCol="clean", outputCol="features", numFeatures=10000)

indexers = StringIndexer(inputCol="genre",  outputCol="labelIndex") # stage to convert targt variable

pipeline = Pipeline(stages=[tokenizer, stopwords, prepositions, vectorizer,indexers])


In [10]:
df_features = pipeline.fit(df).transform(df)



In [11]:
df_features.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|genre|              lyrics|               words|            filtered|               clean|            features|labelIndex|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  pop|hold time feel br...|[hold, time, feel...|[hold, time, feel...|[hold, time, feel...|(10000,[266,668,6...|       0.0|
|  pop|believe drop rain...|[believe, drop, r...|[believe, drop, r...|[believe, drop, r...|(10000,[309,315,3...|       0.0|
|  pop|sweetheart send l...|[sweetheart, send...|[sweetheart, send...|[sweetheart, send...|(10000,[198,564,7...|       0.0|
|  pop|kiss lips want st...|[kiss, lips, want...|[kiss, lips, want...|[kiss, lips, want...|(10000,[219,478,4...|       0.0|
|  pop|till darling till...|[till, darling, t...|[till, darling, t...|[till, darling, t...|(10000,[253,493,1...|       0.0|
+-----+-

In [12]:
df_features.select("labelIndex","features").show(5)

+----------+--------------------+
|labelIndex|            features|
+----------+--------------------+
|       0.0|(10000,[266,668,6...|
|       0.0|(10000,[309,315,3...|
|       0.0|(10000,[198,564,7...|
|       0.0|(10000,[219,478,4...|
|       0.0|(10000,[253,493,1...|
+----------+--------------------+
only showing top 5 rows



In [13]:
df_features.select("genre").distinct().show(50)

+-------+
|  genre|
+-------+
|    pop|
|  blues|
|    rap|
|   jazz|
|hip hop|
|country|
|   rock|
| reggae|
+-------+



In [14]:
Lable_array =  np.array(df_features.select("labelIndex","genre").distinct().collect())

In [15]:
Lable_array

array([['3.0', 'blues'],
       ['0.0', 'pop'],
       ['4.0', 'rock'],
       ['1.0', 'country'],
       ['5.0', 'jazz'],
       ['7.0', 'hip hop'],
       ['2.0', 'rap'],
       ['6.0', 'reggae']], dtype='<U32')

In [16]:
(trainingData, testData) = df_features.randomSplit([0.8, 0.2])

In [17]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt_model = DecisionTreeClassifier(labelCol="labelIndex",featuresCol="features")
model = dt_model.fit(trainingData)

In [18]:
model

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_781a37c68953, depth=5, numNodes=57, numClasses=8, numFeatures=10000

In [19]:
Prediction= model.transform(testData)


In [20]:
Prediction.select("labelIndex","probability","prediction").show()

+----------+--------------------+----------+
|labelIndex|         probability|prediction|
+----------+--------------------+----------+
|       3.0|[0.11867935752528...|       2.0|
|       3.0|[0.19958129797627...|       1.0|
|       3.0|[0.11867935752528...|       2.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.0|[0.27945101029355...|       1.0|
|       3.0|[0.23132313231323...|       0.0|
|       3.0|[0.23132313231323...|       0.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.0|[0.23962882096069...|       0.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.0|[0.11867935752528...|       2.0|
|       3.0|[0.23962882096069...|       0.0|
|       3.0|[0.23132313231323...|       0.0|
|       3.0|[0.23132313231323...|       0.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.0|[0.11867935752528...|       2.0|
|       3.0|[0.25190951821386...|       0.0|
|       3.

In [21]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(Prediction)

print(f"Accuracy: {accuracy:.2f}")

Accuracy: 0.33


In [22]:
row_index = 4  # the index of the row to get the prediction value for
prediction_column_name = "probability"  # the name of the prediction column
prediction_value = Prediction.select(prediction_column_name).collect()[row_index][0]

print(f"Prediction value for row {row_index}: {prediction_value}")

Prediction value for row 4: [0.25190951821386603,0.19726792009400704,0.020710928319623973,0.17802585193889542,0.12162162162162163,0.1150117508813161,0.09944183313748531,0.01601057579318449]


In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(Prediction)
print(accuracy)


0.3251552324700894


In [24]:
testData.show(5)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|genre|              lyrics|               words|            filtered|               clean|            features|labelIndex|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|blues|accuse peep thing...|[accuse, peep, th...|[accuse, peep, th...|[accuse, peep, th...|(10000,[394,2733,...|       3.0|
|blues|ache head little ...|[ache, head, litt...|[ache, head, litt...|[ache, head, litt...|(10000,[290,316,3...|       3.0|
|blues|acoustical sweetn...|[acoustical, swee...|[acoustical, swee...|[acoustical, swee...|(10000,[535,1029,...|       3.0|
|blues|aggies aggies agg...|[aggies, aggies, ...|[aggies, aggies, ...|[aggies, aggies, ...|(10000,[166,316,3...|       3.0|
|blues|alive dead wrong ...|[alive, dead, wro...|[alive, dead, wro...|[alive, dead, wro...|(10000,[387,419,4...|       3.0|
+-----+-

In [25]:
model.save("DTmodel_new_for_Merged_dataset_CSV")

Py4JJavaError: An error occurred while calling o489.save.
: java.io.IOException: Path DTmodel_new_for_Merged_dataset_CSV already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
