In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml import Transformer
from pyspark.ml.util import MLWritable, MLWriter, DefaultParamsWriter, MLReadable, MLReader, DefaultParamsReader, DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, Word2Vec, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.param.shared import Param, Params
from pyspark.sql.types import StringType, ArrayType, IntegerType
from pyspark.sql.functions import regexp_replace, monotonically_increasing_id, explode, col, udf, lower, concat, concat_ws, array_distinct, collect_list, expr
from nltk.stem import SnowballStemmer
import re

In [None]:
spark = SparkSession.builder.master("spark://ronila-workstation:7077") \
    .appName("LyricsApp") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.autoBroadcastJoinThreshold", "5MB") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "4g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/29 11:07:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/29 11:07:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


25/03/29 11:07:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 5 * 1024 * 1024)
# spark.conf.set("spark.memory.offHeap.enabled", "true")
# spark.conf.set("spark.memory.offHeap.size", "8g")

In [3]:
print("Spark UI:", spark.sparkContext.uiWebUrl)

Spark UI: http://ronila-workstation:4040


In [None]:
filePath = """original_dataset.csv"""

mendeleyDF = spark.read.csv(filePath, header=True, inferSchema=True)

mendeleyDF.select("artist_name", "track_name", "release_date", "genre", "lyrics").dropna().show(5)

                                                                                

+--------------------+--------------------+------------+-----+--------------------+
|         artist_name|          track_name|release_date|genre|              lyrics|
+--------------------+--------------------+------------+-----+--------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|
|         pérez prado|            patricia|        1950|  pop|kiss lips want st...|
|giorgos papadopoulos|  apopse eida oneiro|        1950|  pop|till darling till...|
+--------------------+--------------------+------------+-----+--------------------+
only showing top 5 rows



In [None]:
filePath2 = """custom_dataset.csv"""

studentDF = spark.read.csv(filePath2, header=True, inferSchema=True)

studentDF.select("artist_name", "track_name", "release_date", "genre", "lyrics").dropna().show(5)

+--------------+--------------------+------------+------------+--------------------+
|   artist_name|          track_name|release_date|       genre|              lyrics|
+--------------+--------------------+------------+------------+--------------------+
|    Geographer|           The Guest|        2015|classic soul|my days are like ...|
|        Common|                Real|        2014|classic soul|you niggas ain t ...|
|         Ralph|               Tease|        2017|classic soul|i ll confess your...|
|           Joe|Grown Up Christma...|        2009|classic soul|do you remember w...|
|Brian McKnight|O Come, All Ye Fa...|        2008|classic soul|o come all ye fai...|
+--------------+--------------------+------------+------------+--------------------+
only showing top 5 rows



In [5]:
mergedDF = mendeleyDF.union(studentDF)

mergedDF.show(5)

+--------------------+--------------------+------------+-----+--------------------+
|         artist_name|          track_name|release_date|genre|              lyrics|
+--------------------+--------------------+------------+-----+--------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|
|         pérez prado|            patricia|        1950|  pop|kiss lips want st...|
|giorgos papadopoulos|  apopse eida oneiro|        1950|  pop|till darling till...|
+--------------------+--------------------+------------+-----+--------------------+
only showing top 5 rows



In [None]:
# mergedDF.coalesce(1).write.option("header", "true").csv("combined_dataset.csv")

In [7]:
expDF = mergedDF

In [None]:
# Data Cleaning
# lyricsDF = lyricsDF.withColumn("cleaned_lyrics", regexp_replace(col("lyrics"), "[^a-zA-Z\\s]", ""), StringType())

In [10]:
class Cleanser(Transformer, MLReadable, MLWritable):
    def _transform(self, dataset):
        return dataset.withColumn("cleaned_lyrics", regexp_replace(lower(col("lyrics")), "[^a-zA-Z\\s]", ""))
    
    def write(self):
        """Returns an MLWriter instance for saving this ML instance."""
        return DefaultParamsWriter(self) # Use DefaultParamsWriter

    def saveImpl(self, path):
        """Saves the parameters to the input path."""
        DefaultParamsWriter.saveImpl(self, path) # Use DefaultParamsWriter

    @classmethod
    def read(cls):
        """Returns an MLReader instance for loading this ML instance."""
        return DefaultParamsReader(cls)

    @classmethod
    def load(cls, path):
        """Loads the ML instance from the input path."""
        reader = DefaultParamsReader(cls)
        return reader.load(path)



cleanser = Cleanser()

In [None]:
expDF = expDF.withColumn("cleaned_lyrics", regexp_replace(lower(col("lyrics")), "[^a-zA-Z\\s]", ""))
expDF.show(5)

In [None]:
# Assign line numbers
# lyricsDF = lyricsDF.withColumn("line_number", monotonically_increasing_id())

In [11]:
class Numerator(Transformer, MLReadable, MLWritable):
    def _transform(self, dataset):
        return dataset.withColumn("line_number", monotonically_increasing_id())
    
    def write(self):
        """Returns an MLWriter instance for saving this ML instance."""
        return DefaultParamsWriter(self) # Use DefaultParamsWriter

    def saveImpl(self, path):
        """Saves the parameters to the input path."""
        DefaultParamsWriter.saveImpl(self, path) # Use DefaultParamsWriter

    @classmethod
    def read(cls):
        """Returns an MLReader instance for loading this ML instance."""
        return DefaultParamsReader(cls)

    @classmethod
    def load(cls, path):
        """Loads the ML instance from the input path."""
        reader = DefaultParamsReader(cls)
        return reader.load(path)

numerator = Numerator()

In [11]:
expDF = expDF.withColumn("line_number", monotonically_increasing_id())
expDF.show(5)

+--------------------+--------------------+------------+-----+--------------------+--------------------+-----------+
|         artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|line_number|
+--------------------+--------------------+------------+-----+--------------------+--------------------+-----------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|          1|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|sweetheart send l...|          2|
|         pérez prado|            patricia|        1950|  pop|kiss lips want st...|kiss lips want st...|          3|
|giorgos papadopoulos|  apopse eida oneiro|        1950|  pop|till darling till...|till darling till...|          4|
+--------------------+--------------------+------------+-----+--

In [9]:
# Tokenization
tokenizer = RegexTokenizer(inputCol="cleaned_lyrics", outputCol="tokens", pattern="\\W")

In [13]:
expDF = tokenizer.transform(expDF)
expDF.show(5)

+--------------------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+
|         artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|line_number|              tokens|
+--------------------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|          1|[believe, drop, r...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|sweetheart send l...|          2|[sweetheart, send...|
|         pérez prado|            patricia|        1950|  pop|kiss lips want st...|kiss lips want st...|          3|[kiss, lips, want...|
|giorgos papadopoulos|  apopse eid

In [10]:
# Stopword Removal
stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens")

In [15]:
expDF = stopwords_remover.transform(expDF)
expDF.show(5)

+--------------------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+
|         artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|line_number|              tokens|     filtered_tokens|
+--------------------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|          1|[believe, drop, r...|[believe, drop, r...|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|sweetheart send l...|          2|[sweetheart, send...|[sweetheart, send...|
|         pérez prado|            patricia|   

In [None]:
# Exploder
# lyricsDF = lyricsDF.withColumn("words", explode(split(col("cleaned_lyrics"), " ")))
# lyricsDF = lyricsDF.withColumn("words_index", row_number().over(Window.partitionBy("line_number").orderBy(monotonically_increasing_id())))

In [12]:
class Exploder(Transformer, MLReadable, MLWritable):
    def _transform(self, dataset):
        return dataset.withColumn("words", explode(col("filtered_tokens")))
    
    def write(self):
        """Returns an MLWriter instance for saving this ML instance."""
        return DefaultParamsWriter(self) # Use DefaultParamsWriter

    def saveImpl(self, path):
        """Saves the parameters to the input path."""
        DefaultParamsWriter.saveImpl(self, path) # Use DefaultParamsWriter

    @classmethod
    def read(cls):
        """Returns an MLReader instance for loading this ML instance."""
        return DefaultParamsReader(cls)

    @classmethod
    def load(cls, path):
        """Loads the ML instance from the input path."""
        reader = DefaultParamsReader(cls)
        return reader.load(path)
    
exploder = Exploder()

In [17]:
expDF = expDF.withColumn("words", explode(col("filtered_tokens")))
expDF.show(100)

+-------------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+--------+
|  artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|line_number|              tokens|     filtered_tokens|   words|
+-------------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+--------+
|       mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...|    hold|
|       mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...|    time|
|       mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...|    feel|
|       mukesh|mohabbat bhi jhooth

In [None]:
# Stemmer
# class Stemmer(Transformer):
#     def _transform(self, dataset):
#         stemmer_udf = udf(lambda word: SnowballStemmer("english").stem(word), StringType())
#         return dataset.withColumn("stemmed_word", stemmer_udf(dataset["word"]))

In [13]:
class Stemmer(Transformer, MLReadable, MLWritable):
    def _transform(self, dataset):
        remove_suffix_udf = udf(lambda word: re.sub(r'(ing|ed|ly|es|s)$', '', word), StringType())
        return dataset.withColumn("stemmed_word", remove_suffix_udf(col("words")))

    def write(self):
        """Returns an MLWriter instance for saving this ML instance."""
        return DefaultParamsWriter(self) # Use DefaultParamsWriter

    def saveImpl(self, path):
        """Saves the parameters to the input path."""
        DefaultParamsWriter.saveImpl(self, path) # Use DefaultParamsWriter
    
    @classmethod
    def read(cls):
        """Returns an MLReader instance for loading this ML instance."""
        return DefaultParamsReader(cls)

    @classmethod
    def load(cls, path):
        """Loads the ML instance from the input path."""
        reader = DefaultParamsReader(cls)
        return reader.load(path)

stemmer = Stemmer()

In [19]:
remove_suffix_udf = udf(lambda word: re.sub(r'(ing|ed|ly|es|s)$', '', word), StringType())
expDF = expDF.withColumn("stemmed_word", remove_suffix_udf(col("words")))
expDF.show(5)

[Stage 12:>                                                         (0 + 1) / 1]

+-----------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+-----+------------+
|artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|line_number|              tokens|     filtered_tokens|words|stemmed_word|
+-----------+--------------------+------------+-----+--------------------+--------------------+-----------+--------------------+--------------------+-----+------------+
|     mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...| hold|        hold|
|     mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...| time|        time|
|     mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|          0|[hold, time, feel...|[hold, time, feel...| feel|

                                                                                

In [14]:
# Uniter
class Uniter(Transformer, MLReadable, MLWritable):
    def _transform(self, dataset):
        const_lyrics = dataset.groupBy("line_number").agg(concat_ws(" ", collect_list("stemmed_word")).alias("reconstructed_lyrics")).orderBy("line_number")
        return dataset.drop("words", "stemmed_word").dropDuplicates().orderBy("line_number").join(const_lyrics, "line_number", "right").orderBy("line_number")

    def write(self):
        """Returns an MLWriter instance for saving this ML instance."""
        return DefaultParamsWriter(self) # Use DefaultParamsWriter

    def saveImpl(self, path):
        """Saves the parameters to the input path."""
        DefaultParamsWriter.saveImpl(self, path) # Use DefaultParamsWriter
        
    @classmethod
    def read(cls):
        """Returns an MLReader instance for loading this ML instance."""
        return DefaultParamsReader(cls)

    @classmethod
    def load(cls, path):
        """Loads the ML instance from the input path."""
        reader = DefaultParamsReader(cls)
        return reader.load(path)

uniter = Uniter()

In [21]:
const_lyrics = expDF.groupBy("line_number") \
    .agg(concat_ws(" ", collect_list("stemmed_word")).alias("reconstructed_lyrics")) \
    .orderBy("line_number")
const_lyrics.show(5)

expDF = expDF.drop("words", "stemmed_word").dropDuplicates().orderBy("line_number").join(const_lyrics, "line_number", "right").orderBy("line_number")
expDF.show(5)

                                                                                

+-----------+--------------------+
|line_number|reconstructed_lyrics|
+-----------+--------------------+
|          0|hold time feel br...|
|          1|believe drop rain...|
|          2|sweetheart send l...|
|          3|kis lip want stro...|
|          4|till darl till ma...|
+-----------+--------------------+
only showing top 5 rows





+-----------+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|line_number|         artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|              tokens|     filtered_tokens|reconstructed_lyrics|
+-----------+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|          0|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|[hold, time, feel...|[hold, time, feel...|hold time feel br...|
|          1|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|[believe, drop, r...|[believe, drop, r...|believe drop rain...|
|          2|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|sweet

                                                                                

In [None]:
# grouped_df = expDF.groupBy("line_number") \
#     .agg(concat_ws(" ", collect_list("stemmed_word")).alias("reconstructed_lyrics")) \
#     .orderBy("line_number")
# expDF.join(grouped_df, "line_number", "inner").show(5)

In [None]:
# Verser
# lyricsDF = lyricsDF.withColumn("verse", lyricsDF["collect_list(stemmed)"])

In [15]:
class Verser(Transformer, MLReadable, MLWritable):
    def __init__(self, sentences_in_verse=16):
        super().__init__()
        self.sentences_in_verse = sentences_in_verse

    def _transform(self, dataset):
        def create_verses(lyrics):
            words = lyrics.split()
            verses = [" ".join(words[i:i + self.sentences_in_verse]) for i in range(0, len(words), self.sentences_in_verse)]
            return verses

        verse_udf = udf(create_verses, ArrayType(StringType()))

        return dataset.withColumn("verses", verse_udf(dataset["reconstructed_lyrics"]))
    
    def write(self):
        """Returns an MLWriter instance for saving this ML instance."""
        return DefaultParamsWriter(self) # Use DefaultParamsWriter

    def saveImpl(self, path):
        """Saves the parameters to the input path."""
        DefaultParamsWriter.saveImpl(self, path) # Use DefaultParamsWriter

    @classmethod
    def read(cls):
        """Returns an MLReader instance for loading this ML instance."""
        return DefaultParamsReader(cls)

    @classmethod
    def load(cls, path):
        """Loads the ML instance from the input path."""
        reader = DefaultParamsReader(cls)
        return reader.load(path)
    
verser = Verser()

In [23]:
def create_verses(lyrics):
            words = lyrics.split()
            verses = [" ".join(words[i:i + 16]) for i in range(0, len(words), 16)]
            return verses

verse_udf = udf(create_verses, ArrayType(StringType()))
expDF = expDF.withColumn("verses", verse_udf(expDF["reconstructed_lyrics"]))
expDF.show(5)



+-----------+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|line_number|         artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|              tokens|     filtered_tokens|reconstructed_lyrics|              verses|
+-----------+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          0|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|[hold, time, feel...|[hold, time, feel...|hold time feel br...|[hold time feel b...|
|          1|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain...|[believe, drop, r...|[believe, drop, r...|believe drop rain...|[believe drop rai

                                                                                

In [15]:
# Word2Vec
word2Vec = Word2Vec(inputCol="verses", outputCol="features", vectorSize=100, minCount=0)

In [25]:
word2Vec_model = word2Vec.fit(expDF)
expDF = word2Vec_model.transform(expDF)
expDF.show(5)

[Stage 74:>                                                         (0 + 1) / 1]

+-----------+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|line_number|         artist_name|          track_name|release_date|genre|              lyrics|      cleaned_lyrics|              tokens|     filtered_tokens|reconstructed_lyrics|              verses|            features|
+-----------+--------------------+--------------------+------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          0|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|hold time feel br...|[hold, time, feel...|[hold, time, feel...|hold time feel br...|[hold time feel b...|[-8.4334748680703...|
|          1|       frankie laine|           i believe|        1950|  pop|believe drop rain...|believe drop rain

                                                                                

In [6]:
# Indexer
label_indexer = StringIndexer(inputCol="genre", outputCol="labelIndex")
indexer_model = label_indexer.fit(mergedDF)
mergedDF = indexer_model.transform(mergedDF)
mergedDF.show(5)

                                                                                

+--------------------+--------------------+------------+-----+--------------------+----------+
|         artist_name|          track_name|release_date|genre|              lyrics|labelIndex|
+--------------------+--------------------+------------+-----+--------------------+----------+
|              mukesh|mohabbat bhi jhoothi|        1950|  pop|hold time feel br...|       0.0|
|       frankie laine|           i believe|        1950|  pop|believe drop rain...|       0.0|
|         johnnie ray|                 cry|        1950|  pop|sweetheart send l...|       0.0|
|         pérez prado|            patricia|        1950|  pop|kiss lips want st...|       0.0|
|giorgos papadopoulos|  apopse eida oneiro|        1950|  pop|till darling till...|       0.0|
+--------------------+--------------------+------------+-----+--------------------+----------+
only showing top 5 rows



In [7]:
for label, index in zip(indexer_model.labels, range(len(indexer_model.labels))):
    print(f"Index: {index}, Genre: {label}")

Index: 0, Genre: pop
Index: 1, Genre: country
Index: 2, Genre: blues
Index: 3, Genre: rock
Index: 4, Genre: jazz
Index: 5, Genre: reggae
Index: 6, Genre: hip hop
Index: 7, Genre: classic soul


In [18]:
# Regressor
# rfRegressor = RandomForestClassifier(featuresCol="features", labelCol="labelIndex", numTrees=250)
logisticRegression = LogisticRegression(maxIter=100, regParam=0.01, featuresCol="features", labelCol="labelIndex")

In [19]:
pipeline = Pipeline(stages=[
    cleanser,
    numerator,
    tokenizer,
    stopwords_remover,
    exploder,
    stemmer,
    uniter,
    verser,
    word2Vec,
    logisticRegression
])

In [8]:
# Train-test split
train_data, test_data = mergedDF.randomSplit([0.8, 0.2], seed=42)

In [21]:
# --- Parameter Grid for Cross-Validation ---
paramGrid = ParamGridBuilder() \
    .addGrid(word2Vec.vectorSize, [100, 200, 300]) \
    .addGrid(logisticRegression.regParam, [0.01, 0.05]) \
    .addGrid(logisticRegression.maxIter, [100, 200, 300]) \
    .build()

# --- Cross-Validator ---
crossValidator = CrossValidator() \
    .setEstimator(pipeline) \
    .setEstimatorParamMaps(paramGrid) \
    .setEvaluator(MulticlassClassificationEvaluator(labelCol="labelIndex", predictionCol="prediction", metricName="accuracy")) \
    .setNumFolds(3)

In [22]:
model = pipeline.fit(train_data)

25/03/29 03:24:39 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
                                                                                

In [None]:
# cv_model = crossValidator.fit(train_data)

In [23]:
# bestModel = model.bestModel

savePath = "./model"
# bestModel.save(savePath)
model.write().overwrite().save(savePath)

25/03/29 03:25:22 WARN TaskSetManager: Stage 183 contains a task of very large size (10303 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [None]:
# Load the saved model
from pyspark.ml.pipeline import PipelineModel

loaded_model = PipelineModel.load("./model")
new_predictions = loaded_model.transform(test_data)
new_predictions.select("genre", "prediction", "probability").show(truncate=False)

[Stage 225:>                                                        (0 + 1) / 1]

+-----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|genre|prediction|probability                                                                                                                                                      |
+-----+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|pop  |0.0       |[0.2518460796235216,0.18992968652631886,0.15803029715607506,0.14351421537232353,0.13788677653531756,0.08305020624373007,0.03180815442642361,0.003934584116289681]|
|pop  |0.0       |[0.2518460796235216,0.18992968652631886,0.15803029715607506,0.14351421537232353,0.13788677653531756,0.08305020624373007,0.03180815442642361,0.003934584116289681]|
|pop  |0.0       |[0.2518460796235216,0.18992968652631886,0.15803029715607506,0.143514215372323

                                                                                

In [25]:
song_lyrics = """I won't let you down
I will not give you up
Gotta have some faith in the sound
It's the one good thing that I've got
I won't let you down
So please don't give me up
Because I would really, really love to stick around
Oh, yeah
Heaven knows I was just a young boy
Didn't know what I wanted to be (Didn't know what I wanted to be)
I was every little hungry schoolgirl's pride and joy
And I guess it was enough for me (And I guess it was enough for me)
To win the race, a prettier face
Brand new clothes and a big fat place
On your rock and roll TV (Rock and roll TV)
But today the way I play the game is not the same, no way
Think I'm gonna get me some happy
I think there's something you should know
(I think it's time I told you so)
There's something deep inside of me
(There's someone else I've got to be)
Take back your picture in a frame
(Take back your singing in the rain)
I just hope you understand
Sometimes the clothes do not make the man
All we have to do now
Is take these lies and make them true somehow
All we have to see
Is that I don't belong to you and you don't belong to me, yeah yeah"""
new_lyrics_df = spark.createDataFrame([(song_lyrics,)], ["lyrics"])

# Transform new lyrics using the saved pipeline
new_lyrics_transformed = loaded_model.transform(new_lyrics_df)

# Show predictions with probabilities
new_lyrics_transformed.select("prediction", "probability").show(truncate=False)

[Stage 231:>                                                        (0 + 1) / 1]

+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|prediction|probability                                                                                                                                                      |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0.0       |[0.2518460796235216,0.18992968652631886,0.15803029715607506,0.14351421537232353,0.13788677653531756,0.08305020624373007,0.03180815442642361,0.003934584116289681]|
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+



                                                                                

In [51]:
spark.stop()