___Next Part---


In [None]:
# Step 1: PySpark Setup (Skip if already done)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SpotifyContentBased").getOrCreate()

In [None]:
spotify_df = spark.read.csv("/content/spotify_millsongdata.csv", header=True, inferSchema=True)
spotify_df.printSchema()
spotify_df.show(5)


root
 |-- artist: string (nullable = true)
 |-- song: string (nullable = true)
 |-- link: string (nullable = true)
 |-- text: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+
|              artist|                song|                link|                text|
+--------------------+--------------------+--------------------+--------------------+
|                ABBA|Ahe's My Kind Of ...|/a/abba/ahes+my+k...|Look at her face,...|
|And it means some...|                null|                null|                null|
|Look at the way t...|                null|                null|                null|
|How lucky can one...|                null|                null|                null|
|She's just my kin...| she makes me fee...|                null|                null|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col

# Drop rows with nulls in the 'text' column and cast to string
spotify_df_clean = spotify_df.filter(col("text").isNotNull()).withColumn("text", col("text").cast("string"))

# Tokenize song text
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(spotify_df_clean)

# Remove stopwords
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
filtered_data = remover.transform(words_data)

# Convert text to TF features
from pyspark.ml.feature import HashingTF, IDF, Normalizer
hashingTF = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=10000)
featurized_data = hashingTF.transform(filtered_data)

# Compute IDF and TF-IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)

# Normalize the vectors
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
feature_df = normalizer.transform(rescaled_data)

# Show sample features
feature_df.select("song", "norm_features").show(5, truncate=False)


+---------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|song                 |norm_features                                                                                                                                                |
+---------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Ahe's My Kind Of Girl|(10000,[4034,5825,8881,9939],[0.41897927644876937,0.6108256664678718,0.5709601224907664,0.35405212838772054])                                                |
|Andante, Andante     |(10000,[760,855,4928,7017],[0.5473989376671548,0.4282587180383883,0.5363405191786913,0.4788399742629246])                                                    |
|As Good As New       |(10000,[281,6451,7779],[0.6236154426473773,0.5754498581651177,0.529

In [None]:
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Convert DenseVector to list
def cosine_similarity(v1, v2):
    dot_product = float(v1.dot(v2))
    norm1 = float(v1.norm(2))
    norm2 = float(v2.norm(2))
    if norm1 == 0.0 or norm2 == 0.0:
        return 0.0
    return dot_product / (norm1 * norm2)

# UDF to compute similarity with a given song vector
cosine_sim_udf = udf(lambda x: cosine_similarity(reference_vector_broadcast.value, x), FloatType())

# Pick a reference song
reference_song = "Andante, Andante"
ref_vector = feature_df.filter(col("song") == reference_song).select("norm_features").first()["norm_features"]

# Broadcast the vector for efficiency
reference_vector_broadcast = spark.sparkContext.broadcast(ref_vector)

# Add similarity scores
similar_songs = feature_df.withColumn("similarity", cosine_sim_udf(col("norm_features")))

# Get top 5 similar songs (excluding the song itself)
top_similar = similar_songs.filter(col("song") != reference_song).orderBy(col("similarity").desc())

top_similar.select("song", "similarity").show(5, truncate=False)


+--------------------------+----------+
|song                      |similarity|
+--------------------------+----------+
|Olive Me                  |0.64241207|
|Take Me                   |0.6074708 |
|In Limbo                  |0.57636666|
|Come Into My Life         |0.5473989 |
|Don't Cry For Me Argentina|0.5473989 |
+--------------------------+----------+
only showing top 5 rows

