### Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace, trim
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer,StopWordsRemover, Word2Vec, StringIndexer, IndexToString
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from nltk.stem.snowball import SnowballStemmer
from util import cleanser, lower_case, stemmer

In [2]:
spark = SparkSession\
    .builder\
    .appName("Music Genre Prediction")\
    .config("spark.driver.memory", "4g")\
    .config("spark.executor.memory", "4g")\
    .getOrCreate()


print( "Spark version = ", spark.version)

Spark version =  3.3.2


### Data Loading

    Load mendeley dataset

In [None]:
mendeley_music_df = spark.read.option("header", True).csv("../data/mendeley_dataset.csv")

mendeley_music_df.show(5)

In [None]:
mendeley_music_selected_df = mendeley_music_df.select("artist_name", "track_name", "release_date", "genre", "lyrics")

mendeley_music_selected_df.show(5)

In [None]:
mendeley_music_selected_df.printSchema()

In [None]:
mendeley_music_selected_df.groupBy("genre").count().show()

    Load merged dataset

In [3]:
merged_df = spark.read.option("header", True).csv("../Merged_dataset.csv")

merged_df = merged_df.select("artist_name", "track_name", "release_date", "genre", "lyrics")

merged_df.show(5)

+--------------------+--------------------+------------+-----+--------------------+
|         artist_name|          track_name|release_date|genre|              lyrics|
+--------------------+--------------------+------------+-----+--------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|
|         pérez prado|            patricia|        1950|  pop|kiss lips want st...|
|giorgos papadopoulos|  apopse eida oneiro|        1950|  pop|till darling till...|
+--------------------+--------------------+------------+-----+--------------------+
only showing top 5 rows



In [4]:
merged_df.printSchema()

root
 |-- artist_name: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- lyrics: string (nullable = true)



In [5]:
merged_df.groupBy("genre").count().show()

+-------+-----+
|  genre|count|
+-------+-----+
|    pop| 7042|
|country| 5445|
|  blues| 4604|
|   jazz| 3845|
|   rock| 4034|
| reggae| 2498|
|hip hop|  904|
|  retro|  187|
+-------+-----+



### Data Pre-Processing - Without Pipeline

    Remove punctuation symbols and double spaces

In [None]:
punc_pattern = "[^\w\s]"
space_pattern = "\s{2,}"

mendeley_music_selected_df = mendeley_music_selected_df.withColumn("lyrics_cleaned", regexp_replace("lyrics", punc_pattern, ""))
mendeley_music_selected_df = mendeley_music_selected_df.withColumn("lyrics_cleaned", trim(regexp_replace("lyrics_cleaned", space_pattern, " ")))

mendeley_music_selected_df.show(5)

In [None]:
mendeley_music_selected_df.filter(col("lyrics_cleaned").rlike(r'[^\w\s]')).show(5)

    Convert the text to lowercase

In [None]:
mendeley_music_selected_df = mendeley_music_selected_df.withColumn("lyrics_lower", lower(col("lyrics_cleaned")))

mendeley_music_selected_df.show(3)

    Tokenize the lyrics column

In [None]:
tokenizer = Tokenizer(inputCol="lyrics_lower", outputCol="tokens")
mendeley_music_selected_df = tokenizer.transform(mendeley_music_selected_df)

mendeley_music_selected_df.show(5)

    Remove stop words from the tokens

In [None]:
remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_wo_sw")
mendeley_music_selected_df = remover.transform(mendeley_music_selected_df)

mendeley_music_selected_df.show(3)

    Stem the words

In [None]:
stemmer_ = SnowballStemmer(language='english')
stemming_udf = udf(lambda tokens: [stemmer_.stem(token) for token in tokens], ArrayType(StringType()))
mendeley_music_selected_df = mendeley_music_selected_df.withColumn("words_stemmed", stemming_udf(col("tokens_wo_sw")))

mendeley_music_selected_df.show(3)

### Feature Engineering

##### Word2Vec

In [None]:
# Train the Word2Vec model
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="words_stemmed", outputCol="features")
word2Vec_model = word2Vec.fit(mendeley_music_selected_df)

# Extract the features
mendeley_music_selected_df = word2Vec_model.transform(mendeley_music_selected_df)


mendeley_music_selected_df.show(3)

##### Label Encoding

In [None]:
# Label encode the genre column
indexer = StringIndexer(inputCol="genre", outputCol="label")
mendeley_music_selected_df = indexer.fit(mendeley_music_selected_df).transform(mendeley_music_selected_df)

mendeley_music_selected_df.show(3)

### Model Development & Evaluation

In [None]:
# Split the dataset into training and testing sets
train, test = mendeley_music_selected_df.randomSplit([0.8, 0.2], seed=239375)

print("training set shape: ({}, {})".format(train.count(), len(train.columns)))
print("test set shape: ({}, {})".format(test.count(), len(test.columns)))

    Logistic Regression Classifier

In [None]:
# Train a logistic regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10, regParam=0.01)
lr_model = lr.fit(train)

# Evaluate the model on the testing set
y_train_pred = lr_model.transform(train)
y_test_pred = lr_model.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
train_accuracy = evaluator.evaluate(y_train_pred)
test_accuracy = evaluator.evaluate(y_test_pred)

print("Training Set Accuracy = {:.4f}".format(train_accuracy))
print("Test Set Accuracy = {:.4f}".format(test_accuracy))

    Random Forest Classifier

In [None]:
# Train a random forest model
rf = RandomForestClassifier(featuresCol="features", labelCol="label", maxDepth=30, numTrees=10, maxBins=128)
rf_model = rf.fit(train)

# Evaluate the model on the testing set
y_train_pred = rf_model.transform(train)
y_test_pred = rf_model.transform(test)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")
train_accuracy = evaluator.evaluate(y_train_pred)
test_accuracy = evaluator.evaluate(y_test_pred)

print("Training Set Accuracy = {:.4f}".format(train_accuracy))
print("Test Set Accuracy = {:.4f}".format(test_accuracy))

### Data Pre-Processing - With Pipeline

In [None]:
# Stage 1 - Cleanser
cleanser = cleanser.Cleanser(inputCol="lyrics", outputCol="lyrics_cleaned")

# Stage 2 - Lower case
lower_ = lower_case.Lower(inputCol="lyrics_cleaned", outputCol="lyrics_lower")

# Stage 3 - Tokenizer
tokenizer = Tokenizer(inputCol="lyrics_lower", outputCol="tokens")

# Stage 4 - Stop words remover
sw_remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_wo_sw")

# Stage 5 - Stemmer
stemmer = stemmer.Stemmer(inputCol="tokens_wo_sw", outputCol="words_stemmed")

# Stage 6 - Word2Vec
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="words_stemmed", outputCol="features")

# Stage 7 - StringIndexer
indexer = StringIndexer(inputCol="genre", outputCol="label")

In [None]:
pipeline = Pipeline(stages=[cleanser,
                            lower_,
                            tokenizer,
                            sw_remover,
                            stemmer,
                            word2Vec,
                            indexer])

In [None]:
# Fit and transform the data to pipeline
data_prep_model = pipeline.fit(mendeley_music_selected_df)

preprocessed_df = data_prep_model.transform(mendeley_music_selected_df)

preprocessed_df.show(5)

In [None]:
# Save the data prep model
data_prep_model.save("../model/data_prep")

In [None]:
# Split the dataset into training and testing sets
train, test = preprocessed_df.randomSplit([0.8, 0.2], seed=239375)

print("training set shape: ({}, {})".format(train.count(), len(train.columns)))
print("test set shape: ({}, {})".format(test.count(), len(test.columns)))

### Model Selection and Hyperparameter Tuning

    Logistic Regression Classifier

In [None]:
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
lr_cv_model = crossval.fit(train)

# Evaluate the model on the testing set
y_train_pred = lr_cv_model.transform(train)
y_test_pred = lr_cv_model.transform(test)

train_accuracy = evaluator.evaluate(y_train_pred)
test_accuracy = evaluator.evaluate(y_test_pred)

print("Training Set Accuracy = {:.4f}".format(train_accuracy))
print("Test Set Accuracy = {:.4f}".format(test_accuracy))

In [None]:
y_train_pred.show(3)

In [None]:
# Save logistic regression model
lr_cv_model.write().overwrite().save("../model/logistic_regression/")

    Random Forest Classifier

In [None]:
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [3, 5, 7, 10]) \
    .addGrid(rf.maxDepth, [4, 8, 10]) \
    .build()

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
rf_cv_model = crossval.fit(train)

# Evaluate the model on the testing set
y_train_pred = rf_cv_model.transform(train)
y_test_pred = rf_cv_model.transform(test)

train_accuracy = evaluator.evaluate(y_train_pred)
test_accuracy = evaluator.evaluate(y_test_pred)

print("Training Set Accuracy = {:.4f}".format(train_accuracy))
print("Test Set Accuracy = {:.4f}".format(test_accuracy))

In [None]:
# Save the random forest model
rf_cv_model.write().overwrite().save("../model/random_forest/")

### Predict 8 Classes

In [6]:
# Stage 1 - Cleanser
cleanser = cleanser.Cleanser(inputCol="lyrics", outputCol="lyrics_cleaned")

# Stage 2 - Lower case
lower_ = lower_case.Lower(inputCol="lyrics_cleaned", outputCol="lyrics_lower")

# Stage 3 - Tokenizer
tokenizer = Tokenizer(inputCol="lyrics_lower", outputCol="tokens")

# Stage 4 - Stop words remover
sw_remover = StopWordsRemover(inputCol="tokens", outputCol="tokens_wo_sw")

# Stage 5 - Stemmer
stemmer = stemmer.Stemmer(inputCol="tokens_wo_sw", outputCol="words_stemmed")

# Stage 6 - Word2Vec
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol="words_stemmed", outputCol="features")

pipeline = Pipeline(stages=[cleanser,
                            lower_,
                            tokenizer,
                            sw_remover,
                            stemmer,
                            word2Vec
                            ])

# Fit and transform the data to pipeline
data_prep_model = pipeline.fit(merged_df)
preprocessed_df = data_prep_model.transform(merged_df)

# Save the data prep model
data_prep_model.write().overwrite().save("../model/8-classes/data_prep/")

preprocessed_df.show(5)

+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|         artist_name|          track_name|release_date|genre|              lyrics|      lyrics_cleaned|        lyrics_lower|              tokens|        tokens_wo_sw|       words_stemmed|            features|
+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|hold time feel br...|[hold, time, feel...|[hold, time, feel...|[hold, time, feel...|[0.07735481149713...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|believe drop rain...|[believe, drop, r...|[believe, drop

In [7]:
# Label encode the genre column
indexer = StringIndexer(inputCol="genre", outputCol="label")
preprocessed_df = indexer.fit(preprocessed_df).transform(preprocessed_df)

indexer.write().overwrite().save("../model/8-classes/string_indexer/")

preprocessed_df.show(3)

+-------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|  artist_name|          track_name|release_date|genre|              lyrics|      lyrics_cleaned|        lyrics_lower|              tokens|        tokens_wo_sw|       words_stemmed|            features|label|
+-------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|       mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|hold time feel br...|[hold, time, feel...|[hold, time, feel...|[hold, time, feel...|[0.07735481149713...|  0.0|
|frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|believe drop rain...|[believe, drop, r...|[believe, drop, r...|[bel

In [None]:
"""
# create an IndexToString transformer to convert the index back to the original string value
converter = IndexToString(inputCol="label", outputCol="genre_new")
preprocessed_df = converter.transform(preprocessed_df)

converter.write().overwrite().save("../model/8-classes/index_converter/")

preprocessed_df.show(3)
"""

In [8]:
# Split the dataset into training and testing sets
train, test = preprocessed_df.randomSplit([0.8, 0.2], seed=239375)

print("training set shape: ({}, {})".format(train.count(), len(train.columns)))
print("test set shape: ({}, {})".format(test.count(), len(test.columns)))

training set shape: (22835, 12)
test set shape: (5724, 12)


    Logistic Regression Classifier

In [9]:
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1, 10]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
lr_cv_model = crossval.fit(train)

# Evaluate the model on the testing set
y_train_pred = lr_cv_model.transform(train)
y_test_pred = lr_cv_model.transform(test)

train_accuracy = evaluator.evaluate(y_train_pred)
test_accuracy = evaluator.evaluate(y_test_pred)

print("Training Set Accuracy = {:.4f}".format(train_accuracy))
print("Test Set Accuracy = {:.4f}".format(test_accuracy))

Training Set Accuracy = 0.3529
Test Set Accuracy = 0.3368


In [10]:
# Save logistic regression model
lr_cv_model.write().overwrite().save("../model/8-classes/logistic_regression")

In [11]:
y_train_pred.show(3)

+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|         artist_name|          track_name|release_date|genre|              lyrics|      lyrics_cleaned|        lyrics_lower|              tokens|        tokens_wo_sw|       words_stemmed|            features|label|       rawPrediction|         probability|prediction|
+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|"""weird al"" yan...|(this song's just...|        1988|  pop|song word long so...|song word long so...|song word long so...|[song, word, long...|[song, word, long...|[song, word, long...|[0.04

In [12]:
y_all_pred = lr_cv_model.transform(preprocessed_df)

y_all_pred_df = y_all_pred.toPandas()

y_all_pred_df.to_csv("../data/lr_pred_all.csv", index=False)

    Random Forest Classifier

In [None]:
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy")

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [3, 5, 7, 10]) \
    .addGrid(rf.maxDepth, [4, 8, 10]) \
    .build()

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
rf_cv_model = crossval.fit(train)

# Evaluate the model on the testing set
y_train_pred = rf_cv_model.transform(train)
y_test_pred = rf_cv_model.transform(test)

train_accuracy = evaluator.evaluate(y_train_pred)
test_accuracy = evaluator.evaluate(y_test_pred)

print("Training Set Accuracy = {:.4f}".format(train_accuracy))
print("Test Set Accuracy = {:.4f}".format(test_accuracy))

In [None]:
# Save the random forest model
rf_cv_model.write().overwrite().save("../model/8-classes/random_forest")