In [1]:
# import the libraries
import nltk
import pandas as pd
import numpy as np
import string
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.corpus import words
from nltk.metrics.distance import edit_distance
from nltk.util import ngrams as n_grams
from nltk.collocations import BigramAssocMeasures, BigramCollocationFinder

# download nltk resources for query preprocessing
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

from spellchecker import SpellChecker
import gensim.downloader
import spacy
from sklearn.metrics.pairwise import cosine_similarity

import kagglehub
import os
from langdetect import detect

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/akshithabhashetty/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/akshithabhashetty/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/akshithabhashetty/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [2]:
# load kaggle dataset
path = kagglehub.dataset_download("nikhilnayak123/5-million-song-lyrics-dataset")

# read the dataset
data_file = os.path.join(path, os.listdir(path)[0])
df = pd.read_csv(data_file)

In [None]:
# filter for non-empty lyrics with at least some content
df = df[df['lyrics'].apply(lambda x: isinstance(x, str) and len(x.strip()) > 50)]

# take 10,000 random samples since its too large to use completely
df_subset = df.sample(n=10000, random_state=42)

# create a music corpus with tokenized words for the phrase finder
music_corpus = [] 
# iterate through lyrics column
for lyrics in df_subset['lyrics']:
    if isinstance(lyrics, str):
        # tokenize the lyrics into individual words and store
        tokens = word_tokenize(lyrics.lower())

        # only keep ASCII tokens (filtering for only english words)
        english_tokens = []
        for token in tokens:
            # check if the token contains only ASCII characters 
            if all(ord(c) < 128 for c in token):
                english_tokens.append(token)
        
        # if most tokens are ASCII, consider it English
        if len(english_tokens) > (0.7 * len(tokens)):
            music_corpus.extend(english_tokens)

In [None]:
# create a set of english words from nltk
english_words = set(words.words())

In [None]:
# create a set of stopwords
stop_words = set(stopwords.words('english'))

# remove a small subset of descriptive words considered as stopwords
# could be important in understanding context/intent in query
stop_words -= {'like', 'with', 'without', 'for', 'similar', 'to', 'in', 'by', 'as'}

In [None]:
# remove stopwords from tokenized query
def remove_stopwords(tokens) :
    filtered_tokens = []
    # iterate through tokens and keep words that are not in the stopwords
    for token in tokens:
        if token not in stop_words:
            filtered_tokens.append(token)
    return filtered_tokens

In [None]:
# initialize Porter stemmer
stemmer = PorterStemmer()

# initialize the lemmatizer
lemmatizer = WordNetLemmatizer()

In [None]:
# textually normalizes the tokens (removes suffixes or finds root words)
def textually_normalize_tokens(tokens, normalizer, stem=True):
    normalized_tokens = []
    # iterate through token and store stemmed token
    for token in tokens:
        # if it is a stemmer, then stem otherwise lemmatize
        if stem:
            normalized_tokens.append(normalizer.stem(token))
        else:
            normalized_tokens.append(normalizer.lemmatize(token))

    return normalized_tokens

In [None]:
# preprocess query at basic level with tokenization, removing stopwords, and normalizing tokens
def preprocess_query_initial(query, normalizer, stem=True):
    # lowercase the query and then tokenize using nltk word tokenizer
    query_tokens = word_tokenize(query.lower())

    # remove stopwords from the query tokens
    filtered_tokens = remove_stopwords(query_tokens)

    # normalize tokens with lemmatizer or stemmer
    normalized_tokens = textually_normalize_tokens(filtered_tokens, normalizer, stem)

    return normalized_tokens


In [None]:
# spell checking the word using levenshtein distance
def levenshtein_autocorrect(word, threshold=2):
    # if the word is an actual word, return it back as it was probably spelled correctly
    if word in english_words:
        return word
    
    # initialize a list of possible correct words
    possible_corrections = []
    # iterate through each word in the vocabulary to compare edit distance
    for dict_word in english_words:
        # if the difference in length between the word and the vocabulary word 
        # is less than or equal to the threshold, then check the edit distance. 
        # otherwise it is automatically not a possible correction
        if abs(len(dict_word) - len(word)) > threshold:
            # find the edit distance between the two words
            distance = edit_distance(word, dict_word)
            # if the distance is less than or equal to the threshold, store the word and the distance
            if distance <= threshold:
                possible_corrections.append((dict_word, distance))
    # if there are any possible corrections found for the word, return the one with the least distance
    if possible_corrections:
        return sorted(possible_corrections, key=lambda x: x[1])[0][0]
    # if no possible corrections found, return the word itself
    return word

In [None]:
# spell checking the query using levenshtein distance
def autocorrect_query(tokens):
    # iterate through the tokens and use levenshtein distance to correct if appropriate
    for index, token in enumerate(tokens):
        tokens[index] = levenshtein_autocorrect(token)
    return tokens

In [None]:
# spell checking the query
def autocorrect_query(tokens):
    spell = SpellChecker()
    # iterate through the tokens and if the token is mispelled (not seen in spell), 
    # then correct and replace in the list of tokens
    for index, token in enumerate(tokens):
        if token not in spell:
            tokens[index] = spell.correction(token)
    return tokens

In [None]:
# load the pre-trained word2vec model, potentially fine-tune later
word2vec_model = gensim.downloader.load('word2vec-google-news-300')

In [None]:
# find similar words from the word2vec model
def add_similar_query_tokens(tokens):
    similar_terms = []
    # for each token in the query, find 2 similar terms and append
    for token in tokens:
        # find the 2 most similar words to this token
        similar_terms_for_token = word2vec_model.most_similar(token, 3)
        final_similar_terms_for_token = []
        # remove any terms that are too different (low similarity)
        for term, similarity_score in similar_terms_for_token:
            if len(final_similar_terms_for_token) < 2:
                if term not in tokens and similarity_score >= 0:
                    final_similar_terms_for_token.append(term)
        # append the similar terms for this token to the overall list
        similar_terms.extend(final_similar_terms_for_token)
    # return the similar terms for this query
    return similar_terms

In [None]:
# load the spacy model
nlp = spacy.load("en_core_web_sm")


In [None]:
# extract named entities from the query using spacy
def extract_named_entities(query):
    # pass query through spacy model
    query_ner = nlp(query)
    
    # iterate through the entities that were recognized and store in a list
    entities = []
    for ent in query_ner.ents:
        entities.append((ent.text, ent.label_))
    
    # return list of entities containing the actual text and then the label
    return entities

In [None]:
# function that preprocesses the query with basic steps and then enhances with autocorrect, ner, and expanded terms
def preprocess_query_enhanced(query):
    # first preprocess with initial steps
    normalized_tokens = preprocess_query_initial(query, lemmatizer, stem=False)

    # autocorrect the tokens to fix any mispellings
    corrected_tokens = autocorrect_query(normalized_tokens)

    # expand query with word embeddings
    expanded_terms = add_similar_query_tokens(corrected_tokens)

    # extract named entities using spacy
    entities = extract_named_entities(query)

    # add the entities to the query for extra weight
    for entity_text, entity_type in entities:
        normalized_tokens.append(entity_text)

    return normalized_tokens, expanded_terms

In [None]:
# function that finds negations in the tokens
def identify_negations(tokens):
    # list of preliminary negation terms
    negation_terms = {'not', 'no', "don't", "doesn't", 'never', 'without', "won't", 'neither', 'nor'}
    # initialize the lists that will contain the detected negations and the corresponding tokens
    result = {'negation_terms': [], 'negated_terms': []}
    
    # iterate through the tokens and check if there is a negation and do the appropriate steps
    for index, token in enumerate(tokens):
        # if the token is a negation, then find the tokens it corresponds to
        if token.lower() in negation_terms:
            # store the negation
            result['negation_terms'].append(token)
            # if there is a token after this one, store the token that was negated as well
            if index + 1 < len(tokens):
                result['negated_terms'].append(tokens[index + 1])
    # return the negations and their corresponding terms
    return result

In [None]:
# function that removes negated terms and negations
def handle_negations(tokens, expanded_terms):
    # find the negations and the terms that were negated
    negation_info = identify_negations(tokens)
    
    # if there are no terms that were negated, just return the tokens again since there is nothing to be done
    if not negation_info['negated_terms']:
        return tokens, expanded_terms
    
    # find terms that must be removed from the query
    exclude_terms = set()
    # iterate through the negated terms, if similar ones found remove them
    for negated_term in negation_info['negated_terms']:
        # add the negated term
        exclude_terms.add(negated_term)
        # find any similar words
        if negated_term.lower() in word2vec_model:
            similar_words = word2vec_model.most_similar(negated_term, topn=5)
            # add the similar words
            for word, score in similar_words:
                exclude_terms.add(word)
    
    # initialize lists
    filtered_tokens = []
    filtered_expanded_terms = []
    # iterate through the tokens and remove negated terms from tokens and expanded terms 
    for token in tokens:
        # if the token is not to be excluded, store it
        if token not in exclude_terms:
            filtered_tokens.append(token)
            # if the token is not to be excluded and its an expanded term, store it
            if token in expanded_terms:
                filtered_expanded_terms.append(token)

    return filtered_tokens, filtered_expanded_terms

In [None]:
# ngram function
def extract_ngrams(tokens, n=2):
    # make a list of the ngrams
    ngrams = list(n_grams(tokens, n))
    # iterate through the ngrams and make each ngram a phrase or single string
    extracted_ngrams = []
    for ngram in ngrams:
        extracted_ngrams.append(' '.join(ngram))
    return extracted_ngrams

In [None]:
# find the word phrases using pointwise mutual information
def find_phrases(tokens, top_n=3):
    # separate all of the string tokens from any punctuation tokens
    string_tokens = []
    for token in tokens:
        # if the token is not punctuation, store it
        if token not in string.punctuation:
            string_tokens.append(token)
    
    # if there are fewer than 5 tokens, there are probably no phrases so return nothing
    if len(tokens) < 5:
        return []
    
    # intialize bigram measures and finders to find words phrases of length two
    bigram_measures = BigramAssocMeasures()
    finder = BigramCollocationFinder.from_words(music_corpus) 

    # keep pairs that appear more than once
    finder.apply_freq_filter(15)
    
    # find the top n phrases
    phrases = finder.nbest(bigram_measures.pmi, top_n * 10)

    # filter for only english phrases by iterating through the phrases and using langdetect
    english_phrases = []
    for word1, word2 in phrases:
        # if the phrase is english, store
        if word1 in english_words and word2 in english_words:
            english_phrases.append(' '.join((word1, word2)))

        # if enough english phrases found, stop looking for more
        if len(english_phrases) >= top_n:
            return english_phrases

    # return whatever is in english_phrases
    return english_phrases

In [None]:
# calculate the importance of the phrase
def compute_phrase_importance(text, ngram):
    # make the text and ngrams sets to get the unique words from each
    ngram_words = set(ngram.lower().split())
    text_words = set(text.lower().split())
    
    # if the words in ngram are present in the words in the text, then calculate a score as the ratio
    if ngram_words.issubset(text_words):
        return len(ngram_words) / len(text_words)
    # otherwise return zero as it is not important
    return 0.0

In [None]:
# expand the query using the context and ngraphs
def context_sensitive_expansion(query, expanded_terms, ngrams):
    # filter through the expanded terms to keep only the ones that are important 
    relevant_expanded_terms = []    
    for term in expanded_terms:
        term_relevance = 0.0
        # for each ngram phrase, find the importance of the query containing the expanded term to find its importance/relevance
        for ngram in ngrams:
            importance = compute_phrase_importance(query + " " + term, ngram)
            term_relevance += importance
        # if the term is relevant and not an ngram, then store because it is important
        if term_relevance > 0 or not ngrams:
            relevant_expanded_terms.append(term)
    
    return relevant_expanded_terms

In [None]:
# preprocess query, then enhance it, and next expand with negation handling and important phrases
def preprocess_query_context_aware(query):
    # do the enhanced query preprocessing
    tokens, expanded_terms = preprocess_query_enhanced(query)

    # get the ngrams and relevant word phrases
    bigrams = extract_ngrams(tokens, 2)
    trigrams = extract_ngrams(tokens, 3)
    phrases = find_phrases(tokens)

    # combine all these phrases
    phrases = list(set(bigrams + trigrams + phrases))
    
    # handle any negations
    filtered_tokens, filtered_expanded_terms = handle_negations(tokens, expanded_terms)
    
    # do the context-sensitive expansion
    context_sensitive_expanded_terms = context_sensitive_expansion(query, filtered_expanded_terms, phrases)

    # return the tokens with the additional terms/phrases
    return filtered_tokens, context_sensitive_expanded_terms

In [None]:
# create the embeddings for the term
def calculate_term_embedding(term):
    # make sure the term is not a phrase so split in case
    term_words = term.lower().split()
    embeddings = []
    # for each word in the term, find the embedding
    for word in term_words:
        # if there is an embedding for the word, store it
        if word in word2vec_model:
            embeddings.append(word2vec_model[word])
    # if embeddings were found, find the average of them for the embedding of the entire term
    if embeddings:
        return np.mean(embeddings, axis=0)
    # otherwise, just return zeroes as no embeddings were found
    return np.zeros(100)

In [None]:
# use maximal marginal relevance to find the diverse and relevant terms
def apply_maximal_marginal_relevance(query_embedding, possible_terms, lambda_param=0.5, num_terms=10):
    # if there are no possible terms, return nothing
    if not possible_terms:
        return []
    
    # find the embeddings of each possible term
    possible_term_embeddings = []
    for term in possible_terms:
        possible_term_embeddings.append(calculate_term_embedding(term))
    # make it an array to do the cosine similarity
    possible_term_embeddings = np.array(possible_term_embeddings)
    
    # find the similarities between the possible term's embeddings and the query 
    query_similarities = cosine_similarity(possible_term_embeddings, query_embedding.reshape(1, -1)).flatten()
    
    # find the top num terms possible terms to use
    selected_indices = []
    remaining_indices = list(range(len(possible_terms)))
    # iterate the same number of times as the number of terms wanted or all the possible terms, whichever is fewer
    for iteration in range(min(num_terms, len(possible_terms))):
        # if there are no more terms to check, stop
        if not remaining_indices:
            break
            
        # find the mmr scores for the remaining possible terms
        mmr_scores = []
        for index in remaining_indices:
            # if there are terms that have been selected
            if selected_indices:
                # get their embeddings
                selected_embeddings = possible_term_embeddings[selected_indices]

                # find similarities between this possible term and those that have been selected
                similarities_to_selected = cosine_similarity(possible_term_embeddings[index].reshape(1, -1), selected_embeddings).flatten()
                
                # calculate the mmr using the similarities
                mmr = lambda_param * query_similarities[index] - (1 - lambda_param) * np.max(similarities_to_selected)
            # otherwise just use the query similarities as the mmr
            else:
                mmr = query_similarities[index]
            # store the mmr score for this possible term    
            mmr_scores.append((index, mmr))
        
        # find the term with the best mmr score
        selected_index, score = max(mmr_scores, key=lambda x: x[1])

        # update the selected indices and the remaining indices appropriately
        selected_indices.append(selected_index)
        remaining_indices.remove(selected_index)
    
    # get the terms that correspond to the selected indices and return
    chosen_terms = []
    for index in selected_indices:
        chosen_terms.append(possible_terms[index])
    return chosen_terms

In [None]:
# preprocess query, enhance, expand, and filter the expanded terms to make sure only relevant expansions are added
def preprocess_query_diversity(query):
    # do the context aware preprocessing
    tokens, context_sensitive_terms = preprocess_query_context_aware(query)

    # find the embedding of the query
    query_embedding = calculate_term_embedding(query)

    # find the diverse terms
    diverse_terms = apply_maximal_marginal_relevance(query_embedding, context_sensitive_terms)

    print(diverse_terms)

    # return the expanded query
    return ' '.join(tokens) + " " + ' '.join(diverse_terms)

In [None]:
preprocess_query_diversity('i want rock not metal')

['Metal', 'wanna', 'rockers', 'prefer', 'metals', 'folksters']


'want rock metal Metal wanna rockers prefer metals folksters'