In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, lower, col, monotonically_increasing_id, explode, udf
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec, StringIndexer
from pyspark.ml.classification import LogisticRegression

from pyspark.sql.types import ArrayType, StringType
import nltk
from nltk.stem import PorterStemmer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
spark = SparkSession.builder.appName("MusicClassification").getOrCreate()

In [None]:
df = spark.read.csv("/content/drive/MyDrive/msc-ml/datasets/merged_df.csv", header=True , inferSchema=True)

# Data Cleaning
df = df.withColumn("clean_lyrics", lower(col("lyrics")))  # Convert to lowercase
df = df.withColumn("clean_lyrics", regexp_replace(col("clean_lyrics"), "[^a-zA-Z\\s]", ""))  # Remove special characters
df = df.withColumn("index", monotonically_increasing_id())  # Add unique index column

# Label Encoding (Convert genre to numerical label)
label_indexer = StringIndexer(inputCol="genre", outputCol="label")
label_indexer_model = label_indexer.fit(df)
df = label_indexer_model.transform(df)

# Tokenization
tokenizer = Tokenizer(inputCol="clean_lyrics", outputCol="words")
df = tokenizer.transform(df)

# Stopword Removal
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
df = stop_words_remover.transform(df)

# Stemming using NLTK
nltk.download("punkt")
stemmer = PorterStemmer()
stem_udf = udf(lambda words: [stemmer.stem(word) for word in words], ArrayType(StringType()))
df = df.withColumn("stemmed_words", stem_udf(col("filtered_words")))

# Word2Vec Feature Extraction
word2Vec = Word2Vec(vectorSize=100, minCount=2, inputCol="stemmed_words", outputCol="features")
word2Vec_model = word2Vec.fit(df)
df = word2Vec_model.transform(df)

# Split the data into training and test sets (80/20 split)
train_data, test_data = df.randomSplit([0.8, 0.2], seed=239375)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
word2vec_model_path = "/content/drive/MyDrive/ml-datasets/saved-model/tcc_ceds_music/word2vec-combined-df-v2"

In [None]:
word2Vec_model.save(word2vec_model_path)

In [None]:
label_names = label_indexer_model.labels
print("Label Names:", label_names)

Label Names: ['pop', 'country', 'blues', 'rock', 'jazz', 'reggae', 'hip hop', 'soul']


In [None]:
from datetime import datetime
from pyspark.ml.classification import RandomForestClassifier

In [None]:
lr_model_path = '/content/drive/MyDrive/ml-datasets/saved-model/tcc_ceds_music/saved_model/logistic_regression-combined-df-v2'

# LR

In [None]:
logisticRegression = LogisticRegression(featuresCol="features", labelCol="label")

# Hyperparameter Tuning
paramGrid = (ParamGridBuilder()
    .addGrid(logisticRegression.regParam, [0.01, 0.1, 0.5])  # Regularization parameter
    .addGrid(logisticRegression.maxIter, [50, 100, 200])     # Max iterations
    .build())

# Evaluator for Model Selection
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Cross Validation for Model Tuning
crossval = CrossValidator(estimator=logisticRegression,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Start time
start_time = datetime.now()
print(f"Training started at: {start_time}")

# Train model with hyperparameter tuning
model = crossval.fit(train_data)

# End time
end_time = datetime.now()
print(f"Training ended at: {end_time}")

# Evaluate the model on test data
predictions = model.transform(test_data)
accuracy = evaluator.evaluate(predictions)
print(f"Logistic Regression Test Accuracy: {accuracy}")

# Save the model
model.bestModel.save(lr_model_path)

Training started at: 2025-03-25 19:19:15.481725
Training ended at: 2025-03-25 19:28:31.283433
Logistic Regression Test Accuracy: 0.35353535353535354


## LR Check Results

In [None]:
from pyspark.sql.functions import col

predictions = model.transform(train_data)

# Filter records where prediction matches the actual label
correct_predictions = predictions.filter(col("prediction") == col("label"))

# Select the required columns
correct_predictions = correct_predictions.select("index", "lyrics", "prediction", "label")
correct_predictions.toPandas().to_csv('./lr_correct_predictions_train.csv', index=False)

In [None]:
from pyspark.sql.functions import col

# Get predictions on train data
predictions = model.transform(train_data)

# Check if the model outputs probability (some models don't, like LinearSVC)
if "probability" in predictions.columns:
    selected_cols = ["index", "track_name", "lyrics", "original_label", "predicted_label", "probability"]
else:
    selected_cols = ["index", "track_name", "lyrics", "original_label", "predicted_label"]

# Filter records where prediction matches actual label
correct_predictions = predictions.filter(col("prediction") == col("label"))

# Rename label column in predictions to avoid ambiguity
correct_predictions = correct_predictions.withColumnRenamed("label", "predicted_label")

# Join original df with correct predictions based on index
correct_original_data = df.join(correct_predictions.select("index", "predicted_label", "probability") if "probability" in predictions.columns
                                else correct_predictions.select("index", "predicted_label"),
                                on="index", how="inner")

# Rename original label column to avoid confusion
correct_original_data = correct_original_data.withColumnRenamed("label", "original_label")

# Select only required columns
correct_original_data = correct_original_data.select(selected_cols)

# Write to CSV
correct_original_data.toPandas().to_csv('./lr_correct_predictions_train-new-v2.csv', index=False)

## LR Predict


In [None]:
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml.feature import Word2VecModel

In [None]:
# prompt: import LogisticRegressionModel

from pyspark.ml.classification import LogisticRegressionModel
loaded_model = LogisticRegressionModel.load(lr_model_path)

In [None]:
spark = SparkSession.builder.appName("MusicClassification-app").getOrCreate()


def preprocess_lyrics(lyrics):
    df = spark.createDataFrame([(lyrics,)], ["lyrics"])
    df = df.withColumn("clean_lyrics", lower(col("lyrics")))
    df = df.withColumn("clean_lyrics", regexp_replace(col("clean_lyrics"), "[^a-zA-Z\\s]", ""))

    tokenizer = Tokenizer(inputCol="clean_lyrics", outputCol="words")
    df = tokenizer.transform(df)

    stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    df = stop_words_remover.transform(df)

    nltk.download("punkt")
    stemmer = PorterStemmer()
    stem_udf = udf(lambda words: [stemmer.stem(word) for word in words], ArrayType(StringType()))
    df = df.withColumn("stemmed_words", stem_udf(col("filtered_words")))

    word2Vec_model = Word2VecModel.load(word2vec_model_path)
    df = word2Vec_model.transform(df)

    return df

# Function to predict probabilities
def predict_lyrics(lyrics):
    lr_model = LogisticRegressionModel.load(lr_model_path)
    processed_df = preprocess_lyrics(lyrics)
    predictions = lr_model.transform(processed_df)
    probabilities = predictions.select("probability").collect()[0]["probability"]
    return probabilities

In [None]:
song_lyr = "scream break silence wake dead night vengence boil return kill light look listen hear bark moon years spend torment bury nameless grave rise lyric commercial"
pred_res = predict_lyrics(song_lyr)
pred_res

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


DenseVector([0.109, 0.0311, 0.0591, 0.7357, 0.0301, 0.0164, 0.0182, 0.0005])

# XG Boost

In [None]:
from xgboost.spark import SparkXGBClassifier

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from xgboost.spark import SparkXGBClassifier

# Define the XGBoost classifier
xgb_classifier = SparkXGBClassifier(
    features_col="features",
    label_col="label",
    num_classes=8,
    max_depth=6,
    eta=0.1,
    num_round=100
)

# Define hyperparameter grid
paramGrid = (ParamGridBuilder()
    .addGrid(xgb_classifier.max_depth, [3, 6, 9])  # Tree depth
    .addGrid(xgb_classifier.learning_rate, [0.01, 0.1, 0.3])  # Learning rate
    .addGrid(xgb_classifier.n_estimators, [50, 100, 200])  # Boosting rounds
    .build())

# Define evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

# Define Cross Validator
crossval = CrossValidator(
    estimator=xgb_classifier,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3  # 3-fold cross-validation
)

# Train model with hyperparameter tuning
cv_model = crossval.fit(train_data)

# Best model after tuning
best_model = cv_model.bestModel

# Make predictions
predictions = best_model.transform(test_data)

# Define the evaluator for multi-class classification
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Calculate accuracy
accuracy = evaluator.evaluate(predictions)
print(f"XGBoost Model Accuracy: {accuracy}")

INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.01, 'max_depth': 3, 'objective': 'multi:softprob', 'num_classes': 8, 'eta': 0.1, 'num_round': 100, 'num_class': 8, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 50}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.01, 'max_depth': 3, 'objective': 'multi:softprob', 'num_classes': 8, 'eta': 0.1, 'num_round': 100, 'num_class': 8, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!
INFO:XGBoost-PySpark:Running xgboost-2.1.4 on 1 workers with
	booster params: {'device': 'cpu', 'learning_rate': 0.01, 'max_depth': 3, 'objective': 'multi:softprob', 'num_cl

XGBoost Model Accuracy: 0.37861372344130967


In [None]:
model_save_path = "/content/drive/MyDrive/ml-datasets/saved-model/tcc_ceds_music/saved_model/xg-boosts-combined-df-v2"

In [None]:

# Save the best model in the specified folder
best_model.write().overwrite().save(model_save_path)


## Predict using XG Boost Model

In [None]:
from xgboost.spark import SparkXGBClassifierModel
from pyspark.ml.feature import Word2VecModel

# Load the saved model
loaded_model = SparkXGBClassifierModel.load(model_save_path)
word2Vec_model = Word2VecModel.load(word2vec_model_path)
nltk.download("punkt")
tokenizer = Tokenizer(inputCol="clean_lyrics", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
stemmer = PorterStemmer()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [None]:
spark = SparkSession.builder.appName("MusicClassification-app").getOrCreate()


def preprocess_lyrics(lyrics):
    df = spark.createDataFrame([(lyrics,)], ["lyrics"])
    df = df.withColumn("clean_lyrics", lower(col("lyrics")))
    df = df.withColumn("clean_lyrics", regexp_replace(col("clean_lyrics"), "[^a-zA-Z\\s]", ""))

    df = tokenizer.transform(df)

    df = stop_words_remover.transform(df)

    stem_udf = udf(lambda words: [stemmer.stem(word) for word in words], ArrayType(StringType()))
    df = df.withColumn("stemmed_words", stem_udf(col("filtered_words")))

    df = word2Vec_model.transform(df)

    return df

# Function to predict probabilities
def predict_lyrics(lyrics):
    processed_df = preprocess_lyrics(lyrics)
    predictions = loaded_model.transform(processed_df)
    probabilities = predictions.select("probability").collect()[0]["probability"]
    return probabilities

In [None]:
song_lyr = "When Mama said that it was okay Mama said that it was quite alright Our kind of people had a bed for the night And it was okay Mama told us we were good kids And Daddy told us never listen to the ones Pointing nasty fingers and making fun 'Cause we were good kids Remember asking both my mom and dad Why we never travelled to exotic lands We only ever really visit friends Nothing to tell when the summer ends We never really went buying clothes Folks were passing on the stuff in plenty loads New shoes once a year and then Out to play ball so we could ruin them When Mama said that it was okay Mama said that it was quite alright Our kind of people had a bed for the night And it was okay Mama told us we were good kids And Daddy told us never listen to the ones Pointing nasty fingers and making fun 'Cause we were good kids Don’t get me wrong, I didn’t have it bad I got enough loving from my mom and dad But I don’t think they really understood When I said that I wanted a deal in Hollywood I told them I'd be singing on TV The other kids were calling me a wannabe The older kids, they started bugging me But now they're all standing right in front of me When Mama said that it was okay Mama said that it was quite alright Our kind of people had a bed for the night And it was okay Mama told us we were good kids And Daddy told us never listen to the ones Pointing nasty fingers and making fun 'Cause we were good kids I know which place I'm from I know my home When I’m in doubt and struggling That’s where I go An old friend can give advice When new friends only know a half story That’s why I always keep 'em tight And why I'm okay I said I'm okay You know what my mama said? You know what she told me? When Mama said that it was okay Mama said that it was quite alright Our kind of people had a bed for the night And it was okay Mama told us we were good kids And Daddy told us never listen to the ones Pointing nasty fingers and making fun 'Cause we were good kids When Mama said that it was okay Dabdadadabdadad bdaaa... Dabdadadabdadad bdaaa... Dabdadabdadadbdadabdaaaaa... When Mama said that it was okay Dabdadadabdadad bdaaa... Dabdadadabdadad bdaaa... Dabdadabdadadbdadabdaaaaa... When Mama said that it was okay"
pred_res = predict_lyrics(song_lyr)
pred_res

DenseVector([0.0106, 0.0212, 0.0107, 0.0035, 0.0039, 0.007, 0.0002, 0.9428])