# Lyrics sentiment analysis and prediction using pyspark

https://github.com/chaithanya21/Sentiment-Analysis-using-Pyspark-on-Multi-Social-Media-Data/blob/master/pyspark_ml_models.ipynb
https://www.kaggle.com/datasets/cakiki/muse-the-musical-sentiment-dataset?select=muse_v3.csv
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.evaluation.MultilabelClassificationEvaluator.html

This notebook file reads 2 CSV files, one with Songs lyricss and other with Songs classified by sentiments


## Adding dependencies

In [1]:
from IPython import display
import math
import pandas as pd
import numpy as np

from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *

## Creating Spark context

In [2]:
sc =SparkContext()
sqlContext = SQLContext(sc)

24/03/21 12:36:45 WARN Utils: Your hostname, af-Inspiron-7566 resolves to a loopback address: 127.0.1.1; using 192.168.1.65 instead (on interface wlp3s0)
24/03/21 12:36:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/21 12:36:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Creating Schema for Emotions file

In [3]:
shcemaEmotions = StructType([
    StructField("lastfm_url", StringType()),
    StructField("title", StringType()),
    StructField("artist", StringType()),
    StructField("seeds", StringType()),
    StructField("number_of_emotion_tags", StringType()),
    StructField("valence_tags", StringType()),
    StructField("arousal_tags", StringType()),
    StructField("dominance_tags", StringType()),
    StructField("mbid", StringType()),
    StructField("spotify_id", StringType()),
    StructField("genre", StringType())
])

## Creating Schema for Lyrics file

In [4]:
schemaLyrics = StructType([
    StructField("title", StringType()),
    StructField("tag", StringType()),
    StructField("artist", StringType()),
    StructField("year", StringType()),
    StructField("views", StringType()),
    StructField("features", StringType()),
    StructField("lyrics", StringType()),
    StructField("id", StringType()),
    StructField("language_cld3", StringType()),
    StructField("language_ft", StringType()),
    StructField("language", StringType())
])

## Definition of files

In [5]:
#song_lyrics.csv file contains 3093218 songs data
classificationFile = '/home/af/Desktop/Spark/songs_clasification.csv'
lyricsFile = '/home/af/Desktop/Spark/song_lyrics.csv'
outputFile = '/home/af/Desktop/Spark/completes.csv'

Reading CSV files for Emotions and Lyrics

In [6]:
dfE = sqlContext.read.format("csv").option("header", "true").schema(shcemaEmotions).load(classificationFile)#.limit(2000)
dfL = sqlContext.read.format("csv").option("ignoreLeadingWhiteSpace", "true").option("multiline", "true").option('quote','"').option('escape', '"').option("header", "true").schema(schemaLyrics).load(lyricsFile)#.limit(2000)

In [7]:
columns_to_drop = ['lastfm_url', 'mbid', 'spotify_id']
dfE = dfE.drop(*columns_to_drop)
dfE.show(5)
dfE.count()

+----------------+---------+--------------------+----------------------+-----------------+------------------+-----------------+-------+
|           title|   artist|               seeds|number_of_emotion_tags|     valence_tags|      arousal_tags|   dominance_tags|  genre|
+----------------+---------+--------------------+----------------------+-----------------+------------------+-----------------+-------+
|'Till I Collapse|   Eminem|      ['aggressive']|                     6|             4.55| 5.273124999999999|         5.690625|    rap|
|       St. Anger|Metallica|      ['aggressive']|                     8|             3.71| 5.832999999999999|5.427250000000002|  metal|
|        Speedin'|Rick Ross|      ['aggressive']|                     1|             3.08|              5.87|             5.49|    rap|
|    Bamboo Banga|   M.I.A.|['aggressive', 'f...|                    13|6.555071428571428|5.5372142857142865|5.691357142857143|hip-hop|
|      Die MF Die|     Dope|      ['aggressive']

90001

In [8]:
dfL = dfL.where(dfL.language == "en")
columns_to_drop = ['views', 'tag', 'features', 'id', 'language_cld3', 'language_ft', 'language']
dfL = dfL.drop(*columns_to_drop)
dfL.show(5)
dfL.count()

+-----------------+---------+----+--------------------+
|            title|   artist|year|              lyrics|
+-----------------+---------+----+--------------------+
|        Killa Cam|  Cam'ron|2004|[Chorus: Opera St...|
|       Can I Live|    JAY-Z|1996|[Produced by Irv ...|
|Forgive Me Father| Fabolous|2003|Maybe cause I'm e...|
|     Down and Out|  Cam'ron|2004|[Produced by Kany...|
|           Fly In|Lil Wayne|2005|[Intro]\nSo they ...|
+-----------------+---------+----+--------------------+
only showing top 5 rows



                                                                                

3374198

## Creating a combinated list of both

In [9]:
innerJoin = dfE.join(dfL, ["artist", "title"],"inner")
innerJoin.show(5)

[Stage 9:>                                                          (0 + 1) / 1]

+------------+--------------------+------------+----------------------+-----------------+------------------+-----------------+-----------------+----+--------------------+
|      artist|               title|       seeds|number_of_emotion_tags|     valence_tags|      arousal_tags|   dominance_tags|            genre|year|              lyrics|
+------------+--------------------+------------+----------------------+-----------------+------------------+-----------------+-----------------+----+--------------------+
|     Afroman|                Hush|['positive']|                     1|             7.57|               5.5|             7.26|          hip-hop|2000|[Hook] (Afroman t...|
|  Aimee Mann|              You Do|  ['smooth']|                    15|5.512301587301589|3.2575396825396825|5.478571428571429|singer-songwriter|1999|[Verse 1]\nYou st...|
|  Air Supply|Even the Nights A...|['romantic']|                     4|7.420000000000001|            4.9625|5.911666666666666|        soft rock|1

                                                                                

In [10]:
from pyspark.sql.functions import col
innerJoin.groupBy("artist").count().orderBy(col("count").desc()).show(50)

[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|           Bob Dylan|   82|
|        Warren Zevon|   76|
|They Might Be Giants|   74|
|     Robbie Williams|   73|
|            The Cure|   73|
|           Radiohead|   67|
|         The Beatles|   67|
|Manic Street Prea...|   63|
|       Chelsea Wolfe|   59|
|  The Mountain Goats|   59|
|         of Montreal|   58|
|           Kate Bush|   55|
|           Tori Amos|   54|
|       Elliott Smith|   53|
|    Barenaked Ladies|   53|
|         Bright Eyes|   52|
|   Animal Collective|   51|
|         Yo La Tengo|   51|
|      Regina Spektor|   50|
|       Kylie Minogue|   49|
|           Tom Waits|   49|
| The Magnetic Fields|   48|
|           Cat Power|   47|
|            Coldplay|   46|
|                Cold|   46|
|         David Bowie|   46|
|        Jack Johnson|   45|
|                Beck|   45|
|      Sufjan Stevens|   45|
|        The National|   43|
|The Smashing Pump...|   42|
|        Lana 

                                                                                

In [11]:
innerJoin.count()

                                                                                

27349

## Generating file to train model

In [12]:
innerJoin.toPandas().to_csv(outputFile, index=False)

                                                                                

In [13]:
innerJoin.printSchema()

root
 |-- artist: string (nullable = true)
 |-- title: string (nullable = true)
 |-- seeds: string (nullable = true)
 |-- number_of_emotion_tags: string (nullable = true)
 |-- valence_tags: string (nullable = true)
 |-- arousal_tags: string (nullable = true)
 |-- dominance_tags: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- year: string (nullable = true)
 |-- lyrics: string (nullable = true)



## Preprocessing

In [14]:
innerJoin.groupBy("seeds").count().orderBy(col("count").desc()).show(400)

[Stage 27:>                                                         (0 + 1) / 1]

+--------------------+-----+
|               seeds|count|
+--------------------+-----+
|            ['sexy']|  473|
|           ['angry']|  430|
|          ['smooth']|  393|
|            ['epic']|  381|
|      ['aggressive']|  358|
|           ['happy']|  357|
|          ['mellow']|  355|
|             ['fun']|  326|
|        ['romantic']|  323|
|       ['energetic']|  322|
|            ['dark']|  321|
|          ['sleazy']|  316|
|            ['lush']|  302|
|     ['bittersweet']|  298|
|        ['powerful']|  295|
|           ['sweet']|  286|
|          ['quirky']|  285|
|         ['lyrical']|  279|
|             ['sad']|  278|
|           ['quiet']|  276|
|       ['uplifting']|  274|
|         ['intense']|  265|
|            ['soft']|  259|
|           ['light']|  256|
|            ['calm']|  252|
|        ['positive']|  250|
|         ['sensual']|  248|
|     ['sentimental']|  247|
|           ['silly']|  241|
|       ['nostalgic']|  241|
|['sarcastic', 'sa...|  238|
|          ['d

                                                                                

## Model Pipeline

Spark Machine Learning Pipelines API is similar to Scikit-Learn. Our pipeline includes three steps:

regexTokenizer: Tokenization (with Regular Expression)

stopwordsRemover: Remove Stop Words

countVectors: Count vectors (“document-term vectors”)

In [15]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="seeds", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"]

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=30000, minDF=5)

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "seeds", outputCol = "label")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(innerJoin)
dataset = pipelineFit.transform(innerJoin)
dataset.show(5)

[Stage 40:>                                                         (0 + 1) / 1]

+------------+--------------------+------------+----------------------+-----------------+------------------+-----------------+-----------------+----+--------------------+----------+----------+----------------+-----+
|      artist|               title|       seeds|number_of_emotion_tags|     valence_tags|      arousal_tags|   dominance_tags|            genre|year|              lyrics|     words|  filtered|        features|label|
+------------+--------------------+------------+----------------------+-----------------+------------------+-----------------+-----------------+----+--------------------+----------+----------+----------------+-----+
|     Afroman|                Hush|['positive']|                     1|             7.57|               5.5|             7.26|          hip-hop|2000|[Hook] (Afroman t...|[positive]|[positive]|(240,[36],[1.0])| 25.0|
|  Aimee Mann|              You Do|  ['smooth']|                    15|5.512301587301589|3.2575396825396825|5.478571428571429|singer-son

                                                                                

## Partition Training & Test sets

In [22]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

ConnectionRefusedError: [Errno 111] Connection refused

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/af/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/af/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/af/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


In [21]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0).select("seeds","probability","label","prediction")\
.orderBy("probability", ascending=False).show(n = 10, truncate = 30)

ConnectionRefusedError: [Errno 111] Connection refused

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
lrAccuracy = evaluator.evaluate(predictions)
print(lrAccuracy)

ConnectionRefusedError: [Errno 111] Connection refused

## ROC curve
https://spark.apache.org/docs/latest/ml-classification-regression.html#multinomial-logistic-regression

https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/classification/LogisticRegressionTrainingSummary.html

In [None]:
trainingSummary = lrModel.summary

# for multiclass, we can inspect metrics on a per-label basis
print("\nFalse positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nTrue positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("\nPrecision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("\nRecall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("\nF-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("\nAccuracy: %s\n\nFPR: %s\n\nTPR: %s\n\nF-measure: %s\n\nPrecision: %s\n\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

## Logistic Regression using TF-IDF Features

In [None]:
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=30000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

pipelineFit = pipeline.fit(innerJoin)
dataset = pipelineFit.transform(innerJoin)

(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

predictions = lrModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("seeds","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

## Cross-Validation
Let’s now try cross-validation to tune our hyper parameters, and we will only tune the count vectors Logistic Regression.

In [None]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

pipelineFit = pipeline.fit(innerJoin)
dataset = pipelineFit.transform(innerJoin)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)
#print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

## Naive Bayes

In [None]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
nbModel = nb.fit(trainingData)
predictions = nbModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("seeds","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
nbAccuracy = evaluator.evaluate(predictions)
print(nbAccuracy)