<a href="https://colab.research.google.com/github/basadhi/music_genre/blob/main/Music_Genre_Classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
pip install pyspark pandas matplotlib streamlit



**Creating Spark session and MLlib pipeline**

In [50]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
from pyspark.sql.functions import col

In [31]:
mendely_df = spark.read.csv("/content/drive/MyDrive/tcc_ceds_music.csv", header=True, inferSchema=True)
mendely_df.show()

+---+--------------------+--------------------+------------+-----+--------------------+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+----------+------------------+
|_c0|         artist_name|          track_name|release_date|genre|              lyrics|len|              dating|            violence|          world/life|          night/time|  shake the audience|       family/gospel|            romantic|       communication|             obscene|               music|     movement/places|light/visual perceptions|    family/spiritual|          like/girls|             sadness|       

In [32]:
# Select only the relevant columns
mendeley_df = mendely_df.select('artist_name', 'track_name', 'release_date', 'genre', 'lyrics')

mendeley_df.show(3)

+-------------+--------------------+------------+-----+--------------------+
|  artist_name|          track_name|release_date|genre|              lyrics|
+-------------+--------------------+------------+-----+--------------------+
|       mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|
|frankie laine|           i believe|        1950|  pop|believe drop rain...|
|  johnnie ray|                 cry|        1950|  pop|sweetheart send l...|
+-------------+--------------------+------------+-----+--------------------+
only showing top 3 rows



In [33]:
spark = SparkSession.builder.appName("MusicGenreClassifier").getOrCreate()

In [34]:
# Index Labels (Genres)
label_indexer = StringIndexer(inputCol="genre", outputCol="label")

In [35]:
from pyspark.sql.functions import regexp_replace, lower
#remove punctuations and lowercase
def clean_lyrics(df):
    df = df.withColumn('clean_lyrics', lower(regexp_replace('lyrics', '[^\w\s]', '')))
    return df

In [36]:
#remove numbers
def remove_numbers(df):
    df = df.withColumn('clean_lyrics', regexp_replace('clean_lyrics', '\d+', ''))
    return df

In [37]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol='clean_lyrics', outputCol='words')


In [38]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')

In [39]:
from nltk.stem.porter import PorterStemmer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

stemmer = PorterStemmer() #reducing words into base forms

def stem_words(words):
    return [stemmer.stem(word) for word in words]

stem_udf = udf(stem_words, ArrayType(StringType()))


In [40]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=100, minCount=1, inputCol="stemmed_words", outputCol="features")

In [41]:
from pyspark.ml.classification import RandomForestClassifier

classifier = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)

In [42]:
#encode genre labels
from pyspark.ml.feature import StringIndexer

label_indexer = StringIndexer(inputCol='genre', outputCol='label')

**Pipeline**

In [46]:
cleaned_df = clean_lyrics(mendeley_df)
cleaned_df = remove_numbers(cleaned_df)

In [47]:
train_data, test_data = cleaned_df.randomSplit([0.8, 0.2], seed=42)

In [48]:
# 1. Tokenize
train_data = tokenizer.transform(train_data)
test_data = tokenizer.transform(test_data)

In [49]:
# 2. Remove stopwords
train_data = remover.transform(train_data)
test_data = remover.transform(test_data)

In [51]:
# 3. Apply stemming manually
train_data = train_data.withColumn("stemmed_words", stem_udf(col("filtered_words")))
test_data = test_data.withColumn("stemmed_words", stem_udf(col("filtered_words")))

In [52]:
from pyspark.ml import Pipeline

#lyrics → cleanser → numerator → tokenizer → stop word remover → exploder → stemmer → uniter → verser → word2vec → model



# 4. Build pipeline
pipeline = Pipeline(stages=[
    word2Vec,
    label_indexer,
    classifier
])


In [53]:
model = pipeline.fit(train_data)
predictions = model.transform(test_data)

In [54]:
predictions.select('artist_name', 'track_name', 'genre', 'prediction').show(5)

+--------------------+--------------------+-----+----------+
|         artist_name|          track_name|genre|prediction|
+--------------------+--------------------+-----+----------+
|"""weird al"" yan...|christmas at grou...|  pop|       0.0|
|"""weird al"" yan...|       king of suede|  pop|       0.0|
|"""weird al"" yan...|mr. frump in the ...|  pop|       0.0|
|"""weird al"" yan...|your horoscope fo...|  pop|       0.0|
|"bobby ""blue"" b...|get your money wh...|blues|       0.0|
+--------------------+--------------------+-----+----------+
only showing top 5 rows



In [55]:
model.save("/path/to/save/my_pipeline_model")

In [56]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [57]:
save_path = "/content/drive/My Drive/pyspark_models/my_pipeline_model"
model.write().overwrite().save(save_path)

In [58]:
!zip -r /content/drive/My\ Drive/pyspark_models/my_pipeline_model.zip /content/drive/My\ Drive/pyspark_models/my_pipeline_model

  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/ (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/metadata/ (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/metadata/part-00000 (deflated 22%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/metadata/.part-00000.crc (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/metadata/_SUCCESS (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/metadata/._SUCCESS.crc (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/stages/ (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/stages/0_Word2Vec_eb68388d227a/ (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/stages/0_Word2Vec_eb68388d227a/metadata/ (stored 0%)
  adding: content/drive/My Drive/pyspark_models/my_pipeline_model/stages/0_Word2Vec_eb68388d227a/metadata/part-00000 

Evaluate

In [59]:
from pyspark.ml import PipelineModel
load_path = "/content/drive/My Drive/pyspark_models/my_pipeline_model"
loaded_model = PipelineModel.load(load_path)

In [60]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test set accuracy: {accuracy:.2f}")


Test set accuracy: 0.26


# Dataset