In [1]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("carlosgdcj/genius-song-lyrics-with-language-information")

Downloading from https://www.kaggle.com/api/v1/datasets/download/carlosgdcj/genius-song-lyrics-with-language-information?dataset_version_number=1...


100%|██████████| 3.04G/3.04G [00:36<00:00, 89.3MB/s]

Extracting files...





In [2]:
import pandas as pd

data = pd.read_csv(path+"/song_lyrics.csv", skiprows=lambda x: x % 20 != 0)

In [4]:
# Filter Data by English Songs only and make sure the content-type is a song.

filtered_lang_data = data[data['language'] == 'en']

filtered_data = filtered_lang_data[filtered_lang_data['tag'] != 'misc']

In [5]:
import re
import string
import numpy as np
from nltk.stem import SnowballStemmer


# ------------Exact match search and ranking-----------------#

stemmer = SnowballStemmer("english")

_PUNCT_TO_REMOVE = string.punctuation.replace("'", "")
_RE_BRACKET_TAGS = re.compile(r"\[.*?\]")


# ------------Removing tags from lyrics---------------------#
def preprocess_lyrics(lyric: str) -> str:
    if not isinstance(lyric, str):
        return ""

    text = lyric.lower()
    text = _RE_BRACKET_TAGS.sub("", text)
    text = text.translate(str.maketrans("", "", _PUNCT_TO_REMOVE))
    text = re.sub(r"\s+", " ", text).strip()
    return text

def add_processed_column(df, source_col: str = "lyrics"):
    if "processed" not in df.columns:
        df = df.copy()
        df["processed"] = df[source_col].apply(preprocess_lyrics)
    return df

#-----------------Using Stemmer for additional search----------------#
def stem_query(query):
    return " ".join(stemmer.stem(tok) for tok in query.split())


#------Simple Search for Exact matching with stemming as well---------#
def simple_search(query,
                  data,
                  top_k: int = 10):
    raw_pattern  = re.escape(preprocess_lyrics(query))
    stem_pattern = re.escape(stem_query(query))

    exact_hits = data[data['processed'].str.contains(raw_pattern,  regex=True)]
    stem_hits  = data[data['processed'].str.contains(stem_pattern, regex=True)]

    parts = [hits for hits in (exact_hits, stem_hits) if not hits.empty]
    if not parts: # no match at all
        return pd.DataFrame(columns=data.columns)

    merged = pd.concat(parts, ignore_index=True)

    # Normalize the popularity into a score
    if merged['views'].nunique() > 1:
        range = merged['views'].max() - merged['views'].min()
        merged['norm_views'] = (merged['views'] - merged['views'].min()) / range
    else:
        merged['norm_views'] = 1.0 # Full score

    merged = (merged
              .drop_duplicates('id')
              .sort_values('norm_views', ascending=False)
              .head(top_k))

    return merged.reset_index(drop=True)


Steps:

1. lowercase everything
2. remove punctuation
3. remove anything in brackets ([Verse 1], [Chorus], etc)
4. not sure if lemmatization/stemming is useful since we'd often want exact matches on lyrics

In [6]:
#------------------Add Processed Column------------------#
processed_data = add_processed_column(filtered_data)

In [7]:
from collections import Counter
import math


#---------------Adding Bigrams Search------------------#
def make_ngrams(tokens, n):
    ng = [tuple(tokens[i: i + n]) for i in range(0, len(tokens) - n + 1)]
    return ng

def ngram_dict(df, n):
    res = {}
    for index, row in df.iterrows():
        id_val = row['id']

        processed_text = row['processed']
        tokens = processed_text.split()

        ngrams = make_ngrams(tokens, n)

        res[id_val] = Counter(ngrams)
    return res

In [8]:
NGRAM_SIZE = 2

# id -> ngram count
id_nmap = ngram_dict(processed_data, NGRAM_SIZE)

# corpus ngram counts
corpus_counter = Counter()

for doc_ngrams in id_nmap.values():
    corpus_counter.update(doc_ngrams)

total_ngrams = sum(corpus_counter.values())
unique_ngrams = len(corpus_counter)

id_ncount = {doc_id: sum(doc_counter.values()) for doc_id, doc_counter in id_nmap.items()}

id_nunique = {doc_id: len(doc_counter) for doc_id, doc_counter in id_nmap.items()}

In [20]:
#-------------------BM25 BIGRAM SEARCH MODEL------------#

def get_top_matches(query, top_k=10):
    query = preprocess_lyrics(query)
    tokens = query.split()
    query_ngrams = make_ngrams(tokens, NGRAM_SIZE)
    V = unique_ngrams

    results = []

    for doc_id, doc_counter in id_nmap.items():
        total_ngrams_in_doc = id_ncount[doc_id]
        log_score = 0.0
        match_count = 0

        for ngram in query_ngrams:
            count = doc_counter.get(ngram, 0)
            if count > 0:
                match_count += 1
            prob = (count + 1) / (total_ngrams_in_doc + V)
            log_score += math.log(prob)

        if match_count > 0:
            results.append((doc_id, match_count, log_score))

    sorted_results = sorted(results, key=lambda x: (x[1], x[2]), reverse=True)

    return sorted_results[:top_k]

In [10]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.0-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.0


In [35]:
import os
from dotenv import load_dotenv

load_dotenv()  # take environment variables from .env.

google_api_key = os.getenv("GOOGLE_API_KEY")

if google_api_key:
    print("Google API Key loaded successfully.")
else:
    print("GOOGLE_API_KEY not found in .env file.")


GOOGLE_API_KEY not found in .env file.


In [12]:
from google import genai

In [13]:
client = genai.Client(api_key=google_api_key)


#-----------Using GenAI to generate embedings------------#
def generate_embeddings(text):
  result = client.models.embed_content(
        model="gemini-embedding-exp-03-07",
        contents="What is the meaning of life?")
  return result.embeddings[0]

In [16]:
generate_embeddings(processed_data['processed'].iloc[100])

ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource has been exhausted (e.g. check quota).', 'status': 'RESOURCE_EXHAUSTED'}}

In [15]:
processed_data['embedded'] = processed_data['processed'].apply(generate_embeddings)

ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'Resource has been exhausted (e.g. check quota).', 'status': 'RESOURCE_EXHAUSTED'}}

In [17]:
!pip install -U FlagEmbedding

Collecting FlagEmbedding
  Downloading FlagEmbedding-1.3.4.tar.gz (163 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/163.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting datasets>=2.19.0 (from FlagEmbedding)
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting ir-datasets (from FlagEmbedding)
  Downloading ir_datasets-0.5.10-py3-none-any.whl.metadata (12 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets>=2.19.0->FlagEmbedding)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets>=2.19.0->FlagEmbedding)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets>=2.19.0->FlagEmbedding)
  Downloading multiprocess-0.70.16-py311-none-an

In [None]:
from FlagEmbedding import BGEM3FlagModel

model = BGEM3FlagModel('BAAI/bge-m3',
                       use_fp16=True) # Setting use_fp16 to True speeds up computation with a slight performance degradation

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Fetching 30 files:   0%|          | 0/30 [00:00<?, ?it/s]

In [None]:
docs = data['processed'].tolist()
embeddings = model.encode(docs,
                          batch_size=16,
                          max_length=1024)['dense_vecs']

data['embedded'] = list(embeddings)

pre tokenize:  70%|██████▉   | 14963/21396 [24:25<56:47:19, 31.78s/it]

In [None]:
from sklearn.metrics.pairwise import cosine_similarity


#-------------------Vector Search Model---------------------#
def find_most_similar(lyrics):
  lyric_embedded = model.encode(lyrics)['dense_vecs']
  similarity = data['embedded'].apply(lambda x: cosine_similarity(lyric_embedded, x))
  return data.iloc[similarity.idxmax()]



In [33]:
from collections import defaultdict

"""
Now that we have three different searches.
We need to take each of their rankings for the top 10
and cross reference these in order to get the best results.
"""

def normalize_scores(results):
    if not results:
        return []
    scores = np.array([r["score"] for r in results])
    min_score, max_score = scores.min(), scores.max()
    norm_scores = (scores - min_score) / (max_score - min_score + 1e-12)
    for i, r in enumerate(results):
        r["score"] = norm_scores[i]
    return results


#------------------------Hybrid Search--------------------------#
def hybrid_ranked_results(query, data, top_k=10, alpha=0.33, beta=0.33, gamma=0.33):
    # Run all search models
    simple = simple_search(query, data, top_k=top_k)
    bm25 = get_top_matches(query, top_k=top_k)
    # embed = find_most_similar(query, data, top_k=top_k)

    def wrap(results, model):
      if model == "simple":
          return [{"id": row["id"], "score": row["norm_views"], "source": model} for _, row in results.iterrows()]
      elif model == "bm25":
          return [{"id": r[0], "score": r[2], "source": model} for r in results if len(r) == 3]
    # Add handling for "embed" when ready


    all_results = (
        normalize_scores(wrap(simple, "simple"))
        + normalize_scores(wrap(pd.DataFrame(bm25, columns=["id", "count", "score"]), "bm25"))
        # + normalize_scores(wrap(embed, "embed"))
    )

    combined_scores = defaultdict(float)

    for r in all_results:
        if r["source"] == "simple":
            combined_scores[r["id"]] += alpha * r["score"]
        elif r["source"] == "bm25":
            combined_scores[r["id"]] += beta * r["score"]
        elif r["source"] == "embed":
            combined_scores[r["id"]] += gamma * r["score"]

    sorted_ids = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
    top_ids = [doc_id for doc_id, _ in sorted_ids[:top_k]]

    return data[data["id"].isin(top_ids)]




In [34]:
hybrid_ranked_results("we the best music", processed_data, alpha=0.75, beta=0.25, gamma=0.0)

Unnamed: 0,title,tag,artist,year,views,features,lyrics,id,language_cld3,language_ft,language,processed
103800,Nobody,rap,DJ Khaled,2017,63091,"{""Nicki Minaj"",""Alicia Keys""}",[Intro: DJ Khaled & Nicki Minaj]\nWe the Best ...,3117583,en,en,en,we the best music another one young money alic...
153919,Expensive Car Radio,pop,Big Baller B,2019,93,{},[Intro: JAY-Z & DJ Khaled]\nAnd all that shit ...,4651353,en,en,en,and all that shit real too ah i ain't gon' sto...
177423,Off-White,rap,GodFearin,2020,214,{Realnya},[Chorus: Teejay Godfearing]\nAll of my shoes o...,5364498,en,en,en,all of my shoes offwhite ripped jeans white li...
190991,POPSTAR,rap,DJ Khaled,2020,1021462,{Drake},[Intro: DJ Khaled & Drake]\nBitches\nWe The Be...,5774099,en,en,en,bitches we the best music another one yeah dj ...
214560,HARD BEAT,rap,The Muffler Man,2021,4,{},"[Intro]\nI be on duh street\nYeah, I be on duh...",6513323,en,en,en,i be on duh street yeah i be on duh street har...
214563,Dead by Dusk Vol. 1,rap,The Muffler Man,2020,2,{},[Intro]\nWe the best music for real\n\n[Chorus...,6513384,en,en,en,we the best music for real pain rushing throug...


In [None]:
#-----------------Evaluation-----------------------#

# MAP@10
def average_precision(retrieved_docs, relevant_docs):
    score = 0.0
    hits = 0
    for i, doc_id in enumerate(retrieved_docs, 1):
        if doc_id in relevant_docs:
            hits += 1
            score += hits / i
    return score / max(1, len(relevant_docs))

def mean_average_precision(queries, relevance_dict, retriever_fn, top_k=10):
    total_ap = 0.0
    for query in queries:
        retrieved_docs = retriever_fn(query)["id"].tolist()
        relevant_docs = relevance_dict.get(query, set())
        total_ap += average_precision(retrieved_docs[:top_k], relevant_docs)
    return total_ap / len(queries)


# Recall@10
def recall_at_k(retrieved_docs, relevant_docs, k=10):
    return len(set(retrieved_docs[:k]) & relevant_docs) / len(relevant_docs)

def mean_recall(queries, relevance_dict, retriever_fn, top_k=10):
    total_recall = 0.0
    for query in queries:
        retrieved_docs = retriever_fn(query)["id"].tolist()
        relevant_docs = relevance_dict.get(query, set())
        total_recall += recall_at_k(retrieved_docs, relevant_docs, top_k)
    return total_recall / len(queries)


# TODO: Create a dataset for evaluation. Perhaps using manual annotation on a small set of queries and seeing how it compares.