<a href="https://colab.research.google.com/github/hashithacd/spark_genre_classification/blob/main/Project_jonre_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
# !pip install pyspark
# !pip install py4j
# !pip show pyspark

In [14]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [15]:
import pandas as pd
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer, IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession


In [16]:
def get_spark_session():
    # sparkSession = SparkSession.builder.appName('project1').config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY).config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY).config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com").config('spark.executor.memory', '8g').config('spark.driver.memory', '2g').getOrCreate()
    sparkSession = SparkSession.builder.appName('project1').config('spark.executor.memory', '8g').config('spark.driver.memory', '2g').getOrCreate()
    return sparkSession

sc = get_spark_session() 

In [17]:
def read_data():
    df = sc.read.csv("/content/Mendeley_dataset.csv", header=True, inferSchema=True)
    df = df[["artist_name", "track_name", "release_date", "genre",  "lyrics"]]
    # df = df[['genre', 'lyrics']]
    return df
mendeley_df = read_data()

In [18]:
def training_pipeline(df):
    # Define the stages of the ML pipeline for feature extraction
    tokenizer = Tokenizer(inputCol="lyrics", outputCol="words")
    stop_words_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered_words")
    hashing_tf = HashingTF(inputCol=stop_words_remover.getOutputCol(), outputCol="raw_features", numFeatures=10000)
    idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features")

    # Index the genre column
    genre_indexer = StringIndexer(inputCol="genre", outputCol="genre_index")
    genre_indexer_df = genre_indexer.fit(df).transform(df)
    # select the distinct genre and genre_index values from the DataFrame
    genre_mapper_df = genre_indexer_df.select(col('genre'), col('genre_index')).distinct()

    # collect the distinct genre and genre_index values into a Python dictionary
    genre_mapper = genre_mapper_df.rdd.collectAsMap()

    # # use the mapper to map the indexed genre values to their original genre names
    # predicted_df = predicted_df.withColumn('predicted_genre', genre_mapper[col('genre_index')])
    
    # Define the classification model
    # rf = RandomForestClassifier(labelCol="genre_index", featuresCol="features")
    # gbt = GBTClassifier(labelCol="genre_index", featuresCol="features", maxIter=10)
    nb = NaiveBayes(labelCol="genre_index", featuresCol="features")
    # svm = LinearSVC(labelCol="genre_index", featuresCol="features")

    # layers = [10000, 50, 7]
    # classifier = MultilayerPerceptronClassifier(layers=layers, labelCol="genre_index", featuresCol="features", maxIter=100)


    # Define the stages of the ML pipeline for feature extraction and classification
    pipeline = Pipeline(stages=[tokenizer, stop_words_remover, hashing_tf, idf, genre_indexer, nb])
    
    # Split the merged dataset into training and test sets
    (training_data, test_data) = df.randomSplit([0.8, 0.2], seed=123)
    
    # Train the model on the training set
    training_model = pipeline.fit(training_data)
    return training_model, genre_mapper, test_data


traningModel, class_mapper, test_frame = training_pipeline(mendeley_df) 



In [19]:

def model_evaluation(model, test_df):
    # Evaluate the model's performance on the test set
    predictions_df = model.transform(test_df)
    evaluator = MulticlassClassificationEvaluator(labelCol="genre_index", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions_df)
    print("Test set accuracy = {:.2f}%".format(accuracy * 100))
    return predictions_df

prediction_frame = model_evaluation(traningModel, test_frame)

Test set accuracy = 33.35%


In [20]:
class_mapper

{'pop': 0.0,
 'blues': 2.0,
 'jazz': 4.0,
 'country': 1.0,
 'hip hop': 6.0,
 'reggae': 5.0,
 'rock': 3.0}

In [21]:
with open('lyrics.txt', 'r') as f:
    lyrics = f.read()

test_lyrics = pd.DataFrame(columns=['lyrics'])
test_lyrics.loc[0] = [lyrics]

print(test_lyrics)

                                              lyrics
0  [Intro: Future]\nIf Young Metro don't trust yo...


In [22]:
pyspark_df = sc.createDataFrame(test_lyrics)
predictions_df = traningModel.transform(pyspark_df)

In [23]:
# define a function to get the key from the value
def get_key(val):
    return next(key for key, value in class_mapper.items() if value == val)

prediction_value = predictions_df.select("prediction").collect()[0][0]
print(prediction_value)
get_key(prediction_value)

5.0


'reggae'

In [27]:
traningModel.write().overwrite().save("/content/myModel")

In [28]:
from pyspark.ml import PipelineModel

# Load the saved pipeline
loaded_pipeline = PipelineModel.load("/content/myModel")

# Use the loaded pipeline to make predictions on new data
predictions = loaded_pipeline.transform(pyspark_df)

In [29]:
predictions

DataFrame[lyrics: string, words: array<string>, filtered_words: array<string>, raw_features: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [30]:
def get_key(val):
    return next(key for key, value in class_mapper.items() if value == val)

prediction_value = predictions.select("prediction").collect()[0][0]
print(prediction_value)
get_key(prediction_value)

5.0


'reggae'