In [1]:
! pip install -q pyspark==3.3.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.3/281.3 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import pyspark
import tqdm
import os
from glob import glob
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
from pyspark.sql import functions as func
from pyspark.sql.types import *
import warnings
import json
warnings.filterwarnings("ignore")
pd.set_option('display.max_rows', None)

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
import json
import re
import string


In [5]:
from pyspark.ml.feature import MinHashLSH,Tokenizer, VectorAssembler,  HashingTF
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.functions import array_to_vector
from pyspark.ml.feature import Word2Vec

In [6]:
# Create a spark session
conf = pyspark.SparkConf()
sc = pyspark.SparkContext(conf=conf)
config = pyspark.SparkConf().setAll([('spark.executor.memory', '20g'), ('spark.executor.cores', '5'), ('spark.cores.max', '5'), ('spark.driver.memory','100g')])
sc.stop()


sc = pyspark.SparkContext(conf=conf)
sc.setCheckpointDir("checkpoints/")
spark = pyspark.sql.SparkSession(sc)
sc.setLogLevel("ERROR")
spark

In [7]:
# Getting all the file names scraped from rate your music
rating = ['top','bottom','popular','esoteric','diverse']
# path = "data/genius_lyrics/{}/"
path = "/content/drive/MyDrive/Big_Data_Project/spark_y_rock_anthem/data/genius_lyrics/{}/"
all_paths = {}
for order in rating:
    temp = path.format(order)
    all_paths[order] = []
    for file in glob(temp+"*.json"):
        all_paths[order].append(file)

In [8]:
# Writing the schema for read
schema = StructType([
            StructField('Ranking', IntegerType()),
            StructField('Album', StringType()),
            StructField('Artist Name', StringType()),
            StructField('Release Date', StringType()),
            StructField('Genres', StringType()),
            StructField('Descriptors', StringType()),
            StructField('Average Rating', StringType()),
            StructField('spotify album uri', StringType()),
            StructField('spotify artist uri', StringType()),
            StructField('spotify track uri', StringType()),
            StructField('spotify track name', StringType()),
            StructField('spotify track number', IntegerType()),
            StructField('spotify disc number', IntegerType()),
            StructField('spotify track popularity', IntegerType()),
            StructField('spotify track duration', IntegerType()),
            StructField('spotify track features', MapType(StringType(), StringType())),
            StructField('spotify artist name', StringType()),
            StructField('spotify artist popularity', IntegerType()),
            StructField('spotify artist followers', IntegerType()),
            StructField('spotify artist genres', ArrayType(StringType())),
            StructField('lyrics', StringType())
            ])

In [9]:
# Reading the dataframe
counter = 0
for order in all_paths.keys():
    
    # Search for a specific category - comment the next two lines if you want to include all categories
    if order != 'top':
        continue
    
    # Iterating through all the file paths in that category
    for j_file in tqdm.tqdm(all_paths[order]):
        
        # Getting the year from the file path
        year = j_file[-9:-5]
        
        # Look for a specific year -  comment the next two lines if you want to include all years+
        if int(year) < 2021 or int(year) == 2023:
            continue
        
        counter+=1

        # Opening a file 
        with open(j_file) as f:
            data_dict = json.load(f)

        if counter == 1:
            # creating a dataframe
            df = spark.createDataFrame(data_dict, schema = schema)
        else:
            try:
              df = df.unionAll(spark.createDataFrame(data_dict, schema = schema))
            except:
              print("Issue Reading")
              continue


100%|██████████| 25/25 [00:07<00:00,  3.53it/s]


In [10]:
# Step 3: Preprocess the lyrics
stopwords = set(['a', 'an', 'the', 'and', 'or', 'if', 'then', 'this', 'that'])  # fill in with actual stopwords


In [11]:
# Creating a udf to clean the database lyrics
@udf(returnType= StringType())
def preprocess_text(text):
    res = text.split("Lyrics",1)[1]
    res = res.split("Embed")[0][:-1]
    res = "\n".join([i for i in res.split("\n") if i!=''])
    text = res.lower()
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    text = ' '.join([word for word in text.split() if word not in stopwords])
    # text = text.split("\n")
    return text

In [12]:
# Creating a udf to clean the partial query
@udf(returnType= StringType())
def preprocess_query_text(text):
    text = text.lower()
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    text = ' '.join([word for word in text.split() if word not in stopwords])
    # text = text.split("\n")
    return text

In [13]:
temp = df.where("lyrics != 'None'")

In [14]:
temp.limit(1).toPandas()

Unnamed: 0,Ranking,Album,Artist Name,Release Date,Genres,Descriptors,Average Rating,spotify album uri,spotify artist uri,spotify track uri,...,spotify track number,spotify disc number,spotify track popularity,spotify track duration,spotify track features,spotify artist name,spotify artist popularity,spotify artist followers,spotify artist genres,lyrics
0,1,Ants From Up There,"Black Country, New Road",4 February 2022,Art Rock Post-Rock Chamber Pop,longing melancholic passionate lush sentimenta...,4.02,spotify:album:21xp7NdU1ajmO1CX0w2Egd,spotify:artist:3PP6ghmOlDl2jaKaH0avUN,spotify:track:2UEH1NjNHGsoEIr3GKLhNR,...,2,1,56,216680,"{'loudness': '-8.937', 'acousticness': '0.2', ...","Black Country, New Road",58,213028,"[crank wave, indie rock, london indie, uk post...",52 ContributorsChaos Space Marine Lyrics\nAnd ...


In [15]:
temp1 = temp.select("spotify track name", preprocess_text("lyrics").alias('cleaned lyrics'))

In [18]:
temp1.limit(1).toPandas()

Unnamed: 0,spotify track name,cleaned lyrics
0,Chaos Space Marine,though england is mine i must leave it all beh...


In [16]:
# Tokenize the lyrics column
tokenizer = Tokenizer(inputCol="cleaned lyrics", outputCol="lyrics_token")
temp2 = tokenizer.transform(temp1)

In [None]:
temp2.limit(1).toPandas()

Unnamed: 0,spotify track name,cleaned lyrics,lyrics_token
0,Lean Beef Patty,in morning maybe we could start family i wanna...,"[in, morning, maybe, we, could, start, family,..."


In [17]:
# Creating another udf to make the words into a set
@udf(returnType= ArrayType(StringType()))
def unique_words(arr):
    arr = list(set(arr))
    return arr

In [18]:
# Getting the unique words in a lyric
temp3 = temp2.select("spotify track name","cleaned lyrics","lyrics_token",unique_words('lyrics_token').alias("unique words"))

In [None]:
temp3.limit(1).toPandas()

Unnamed: 0,spotify track name,cleaned lyrics,lyrics_token,unique words
0,Lean Beef Patty,in morning maybe we could start family i wanna...,"[in, morning, maybe, we, could, start, family,...","[eight, we, face, ye, how, hate, for, myself, ..."


In [19]:
# Running the hash transform
hashing_tf = HashingTF(inputCol="unique words", outputCol="hash features")
raw_features = hashing_tf.transform(temp3).limit(5)

In [20]:
# Checking the result
op = raw_features.limit(1).toPandas()

In [21]:
# Looking at one value of the feature
op['hash features'][0]

SparseVector(262144, {5381: 1.0, 6099: 1.0, 7336: 1.0, 12109: 1.0, 12409: 1.0, 14273: 1.0, 17734: 1.0, 17893: 1.0, 18184: 1.0, 19036: 1.0, 22575: 1.0, 24016: 1.0, 24175: 1.0, 25000: 1.0, 25491: 1.0, 27308: 1.0, 27576: 1.0, 29440: 1.0, 30950: 1.0, 31015: 1.0, 33053: 1.0, 34121: 1.0, 35844: 1.0, 38698: 1.0, 39928: 1.0, 42404: 1.0, 45252: 1.0, 45843: 1.0, 48531: 1.0, 50001: 1.0, 55039: 1.0, 55639: 1.0, 56794: 1.0, 56808: 1.0, 58672: 1.0, 67009: 1.0, 68538: 1.0, 70998: 1.0, 71789: 1.0, 75958: 1.0, 77053: 1.0, 77767: 1.0, 79876: 1.0, 80998: 1.0, 81566: 1.0, 82065: 1.0, 89356: 1.0, 89833: 1.0, 91878: 1.0, 92032: 1.0, 103545: 1.0, 106213: 1.0, 106776: 1.0, 106841: 1.0, 109208: 1.0, 109687: 1.0, 109996: 1.0, 113673: 1.0, 117155: 1.0, 117491: 1.0, 121517: 1.0, 123257: 1.0, 124710: 1.0, 131151: 1.0, 132133: 1.0, 134304: 1.0, 140315: 1.0, 140784: 1.0, 145207: 1.0, 146139: 1.0, 147136: 1.0, 147738: 1.0, 150152: 1.0, 150319: 1.0, 151536: 1.0, 153423: 1.0, 163000: 1.0, 164698: 1.0, 168976: 1.0, 1695

In [86]:
# Initialize MinhashLSH
mh = MinHashLSH(inputCol="hash features", outputCol="minhashlsh", numHashTables=100, seed=123)

# Fit the model to the data
model = mh.fit(raw_features)

# # Calculate hashes for the data
# hashed = model.transform(raw_features)

In [89]:
# Enter the input query
user_inp = input("Enter the query: ")

Enter the query: And though England is mine I must leave it all behind The war is over Lift the anchors, set an open course For New York state lines I think of all that went wrong The sailor boys light up in song And they sing of London Love they made there, will it really last any time? What's that that you said to me? Oh, I'm a chaos space marine So what? I love you Darling, will you take my metal hand? It's cold In time, you will find These things take up space inside your mind Where you could be keeping honest thoughts of the sea Alone  So I'm l\u0435aving this body And I'm never coming home again, y\u0435ah I'll bury the axe here Between the window and the kingdom of men Oh, I'm becoming a worm now And I'm looking for a place to live, yeah Here I come now You might also like 


In [90]:
query_df = spark.createDataFrame([(user_inp,)], ["partial query"])

In [91]:
query_df.toPandas()

Unnamed: 0,partial query
0,And though England is mine I must leave it all...


In [92]:
# Clean the lyrics
q_temp1 = query_df.select(preprocess_query_text("partial query").alias('cleaned lyrics'))

In [73]:
q_temp1.toPandas()

Unnamed: 0,cleaned lyrics
0,though england is mine i must leave it all beh...


In [93]:
# Tokenize the lyrics
tokenizer = Tokenizer(inputCol="cleaned lyrics", outputCol="lyrics_token")
q_temp2 = tokenizer.transform(q_temp1)

In [75]:
q_temp2.toPandas()

Unnamed: 0,cleaned lyrics,lyrics_token
0,though england is mine i must leave it all beh...,"[though, england, is, mine, i, must, leave, it..."


In [94]:
# Getting the unique words in a lyric
q_temp3 = q_temp2.select("cleaned lyrics","lyrics_token",unique_words('lyrics_token').alias("unique words"))

In [77]:
q_temp3.toPandas()

Unnamed: 0,cleaned lyrics,lyrics_token,unique words
0,though england is mine i must leave it all beh...,"[though, england, is, mine, i, must, leave, it...","[ill, course, for, they, could, find, men, so,..."


In [95]:
# Running the hash transform
hashing_tf = HashingTF(inputCol="unique words", outputCol="hash features")
q_raw_features = hashing_tf.transform(q_temp3)

In [111]:
query_inp = q_raw_features.select("hash features").toPandas()["hash features"][0]

In [113]:
similar_songs = model.approxNearestNeighbors(raw_features, query_inp, 1)

In [114]:
res = similar_songs.toPandas()

In [115]:
res

Unnamed: 0,spotify track name,cleaned lyrics,lyrics_token,unique words,hash features,minhashlsh,distCol
0,Chaos Space Marine,though england is mine i must leave it all beh...,"[though, england, is, mine, i, must, leave, it...","[ill, course, for, they, could, find, yeah1, m...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[[29682208.0], [9762427.0], [271433.0], [32050...",0.145299
