In [1]:
%pip install nltk swifter opendatasets


Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Collecting kaggle
  Downloading kaggle-1.6.1.tar.gz (84 kB)
Collecting python-slugify
  Downloading python_slugify-8.0.1-py2.py3-none-any.whl (9.7 kB)
Collecting text-unidecode>=1.3
  Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
Building wheels for collected packages: kaggle
  Building wheel for kaggle (setup.py): started
  Building wheel for kaggle (setup.py): finished with status 'done'
  Created wheel for kaggle: filename=kaggle-1.6.1-py3-none-any.whl size=111907 sha256=d3e625929f0c4a3c2a806b7a07314d0f69e4ddcf331571dd7ba906b6734302ce
  Stored in directory: c:\users\marc\appdata\local\pip\cache\wheels\56\e5\e8\da57097e519eca5372e1e4bd7d9d3e9fc2743c967b343c6a5b
Successfully built kaggle
Installing collected packages: text-unidecode, python-slugify, kaggle, opendatasets
Successfully installed kaggle-1.6.1 opendatasets-0.1.22 python-slugify-8.0.1 text-unidecode-1.3


In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random
import re
import os
import string
import swifter
import torch
import opendatasets as od

import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Download Data

1. Option: download the full dataset and select some percentage. You will need to enter your username and kaggle API token when prompted. You can create an API token at https://www.kaggle.com > settings > API > Create New API Token.

2. Option: load preprocessed data and qrels (you can skip the section for preprocessing document and generating queries below)

In [3]:
# OPTION 1
dataset = 'https://www.kaggle.com/datasets/carlosgdcj/genius-song-lyrics-with-language-information/'
# Download the data set using opendatasets
od.download(dataset)

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: 

In [None]:
# We can't work with a 9GB file, so read 10% of the data at random
p = 0.1
# keep the header, then take only p% of lines
df = pd.read_csv(
    "./genius-song-lyrics-with-language-information/song_lyrics.csv",
    header=0,
    skiprows=lambda i: i > 0 and random.random() > p
)

In [None]:
# OPTION 2
import urllib.request

urllib.request.urlretrieve('https://f002.backblazeb2.com/file/ffactory-public/documents.pkl', 'documents.pkl')
urllib.request.urlretrieve('https://f002.backblazeb2.com/file/ffactory-public/qrels-train.pkl', 'qrels-train.pkl')
urllib.request.urlretrieve('https://f002.backblazeb2.com/file/ffactory-public/qrels-test.pkl', 'qrels-test.pkl')

NameError: name 'pd' is not defined

In [ ]:
df = torch.load('documents.pkl')
qrels_train = torch.load('qrels-train.pkl')
qrels_test = torch.load('qrels-test.pkl')

# Preprocessing

In [16]:
nltk.download("punkt")
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /Users/ffactory/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/ffactory/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [186]:
ps = PorterStemmer()
stopwords_en = set(stopwords.words('english'))

# Select only english songs and songs which contain verses (newlines)
df = df[df["language"] == "en"]
df = df[df["lyrics"].str.contains("\n")]

# Select columns we care about
df = df[["title", "lyrics", "views", "id"]]
df.rename(columns={"id": "doc_id"}, inplace=True)
df.set_index("doc_id", inplace=True)

df_proc = df.copy()
df_proc.rename(columns={"lyrics": "text"}, inplace=True)

# Convert to lowercase
df_proc["text"] = df_proc["text"].str.lower()
# Remove any non-alphanumeric / whitespace characters
df_proc["text"] = df_proc["text"].str.replace(re.compile(r"[^\w\s]"), "", regex=True)
# Remove newlines
df_proc["text"] = df_proc["text"].str.replace("\n", " ", regex=False)
# Remove text between square brackets
df_proc["text"] = df_proc["text"].str.replace(re.compile(r"\[.{0,100}\]"), "", regex=True)
# Split text into words
df_proc["text"] = df_proc["text"].str.rsplit()

In [15]:
print(f"number of english songs: {len(df_proc)}")
df_proc.head()

number of songs: 12295


Unnamed: 0,title,tag,artist,year,views,features,lyrics,id,language_cld3,language_ft,language
0,Killa Cam,rap,Cam'ron,2004,173166,"{""Cam\\'ron"",""Opera Steve""}","[Chorus: Opera Steve & Cam'ron]\nKilla Cam, Ki...",1,en,en,en
1,Can I Live,rap,JAY-Z,1996,468624,{},"[Produced by Irv Gotti]\n\n[Intro]\nYeah, hah,...",3,en,en,en
2,Forgive Me Father,rap,Fabolous,2003,4743,{},Maybe cause I'm eatin\nAnd these bastards fien...,4,en,en,en
3,Down and Out,rap,Cam'ron,2004,144404,"{""Cam\\'ron"",""Kanye West"",""Syleena Johnson""}",[Produced by Kanye West and Brian Miller]\n\n[...,5,en,en,en
4,Fly In,rap,Lil Wayne,2005,78271,{},"[Intro]\nSo they ask me\n""Young boy\nWhat you ...",6,en,en,en


In [187]:
# Remove stopwords and stem tokens
def remove_stopwords_and_stem(tokens):
    return " ".join([ps.stem(token) for token in tokens if token not in stopwords_en])


df_proc["text"] = df_proc["text"].swifter.apply(remove_stopwords_and_stem)

Pandas Apply:   0%|          | 0/12064 [00:00<?, ?it/s]

In [188]:
# Compare the original lyrics with the tokenized lyrics
(df.iloc[5]["lyrics"][0:100], df_proc.iloc[5]["text"][0:100])

("[Intro: Lil Wayne]\nHaha\nUh-huh\nNo homo (Young Mula, baby!)\nI say, he's so sweet, make her wanna lick",
 'intro lil wayn haha uhhuh homo young mula babi say he sweet make wanna lick wrapper remix babi vers ')

In [189]:
df_proc.head()

Unnamed: 0,title,views,text
0,Killa Cam,173166,choru opera steve camron killa cam killa cam c...
1,Can I Live,468624,produc irv gotti intro yeah hah yeah rocafella...
2,Forgive Me Father,4743,mayb caus im eatin bastard fiend grub carri pu...
3,Down and Out,144404,produc kany west brian miller intro camron kan...
4,Fly In,78271,intro ask young boy gon second time around gon...


In [190]:
# Save as a pickle file
torch.save(df_proc[["title", "text"]], 'documents.pkl')

# Test if the pickle file is saved correctly
df_reloaded = torch.load('documents.pkl')
df_reloaded.head()

Unnamed: 0,title,text
0,Killa Cam,choru opera steve camron killa cam killa cam c...
1,Can I Live,produc irv gotti intro yeah hah yeah rocafella...
2,Forgive Me Father,mayb caus im eatin bastard fiend grub carri pu...
3,Down and Out,produc kany west brian miller intro camron kan...
4,Fly In,intro ask young boy gon second time around gon...


# Generate Queries

In [151]:
# Functions to Select Verses
def getFirstVerses(lyricsString, amount):
    verseList = re.split('\n', lyricsString)
    FinalList = [i for i in verseList if (len(i) > 1 and i[0] != '[')]
    return " ".join(FinalList[:amount])


def getFirstVersesOfChorus(lyricsString, amount):
    List = re.split('\n', lyricsString)
    verseList = [i for i in List if len(i) > 1]
    for i in range(len(verseList)):
        if "[Chorus" in verseList[i] or "[Hook" in verseList[i]:
            return " ".join(verseList[i + 1:i + amount + 1])
    return getFirstVerses(lyricsString, amount)


def getRandomVerses(lyricsString, amount):
    verseList = re.split('\n', lyricsString)
    FinalList = [i for i in verseList if (len(i) > 1 and i[0] != '[')]
    rd = random.randint(0, len(FinalList) - amount)
    return " ".join(FinalList[rd:rd + amount])

In [152]:
# Functions to Degrade message

#Function to create typo by neighbouring letter
NeighbouringKeys = {
    'q': "qwas",
    'w': "qwase",
    'e': "wsedr",
    'r': "edrft",
    't': "rftgy",
    'y': "tgyhu",
    'u': "yhuji",
    'i': "ujiko",
    'o': "ikolp",
    'p': "olp",
    'a': "qwasz",
    's': "wazsxed",
    'd': "sxedcrf",
    'f': "dcrfvtg",
    'g': "fvtgbyh",
    'h': "gbyhnuj",
    'j': "hnujmik",
    'k': "jmikol",
    'l': "kolp",
    'z': "azsx",
    'x': "zsxdc",
    'c': "xdcfv",
    'v': "cfvgb",
    'b': "vgbhn",
    'n': "bhnjm",
    'm': "njmk"
}

englishLetters = NeighbouringKeys.keys()


def typos(text, prob=0.01):
    resultingText = ""

    for letter in text:
        if not letter in englishLetters:
            newLetter = letter
        else:
            if random.random() < prob:
                newLetter = random.choice(NeighbouringKeys[letter])
            else:
                newLetter = letter
        resultingText += newLetter

    return resultingText


#Function to (maybe) invert 2 adjacent letters (do force=True to force it to happen)
def invertAdjacentLetters(text, force=False):
    rd = random.randint(0, len(text) - 2)
    if not force:
        if text[rd] in englishLetters and text[rd + 1] in englishLetters:
            return text[:rd] + text[rd + 1] + text[rd] + text[rd + 2:]
        else:
            return text
    else:
        while not (text[rd] in englishLetters and text[rd + 1] in englishLetters):
            rd = random.randint(0, len(text) - 2)
        return text[:rd] + text[rd + 1] + text[rd] + text[rd + 2:]


#Function to (maybe) remove a letter (do force=True to force it to happen)
def removeLetter(text, force=False):
    rd = random.randint(0, len(text) - 1)
    if not force:
        if text[rd] in englishLetters:
            return text[:rd] + text[rd + 1:]
        else:
            return text
    else:
        while not (text[rd] in englishLetters):
            rd = random.randint(0, len(text) - 1)
        return text[:rd] + text[rd + 1:]


#Function to (maybe) double a letter (do force=True to force it to happen)
def doubleLetter(text, force=False):
    rd = random.randint(0, len(text) - 1)
    if not force:
        if text[rd] in englishLetters:
            return text[:rd + 1] + text[rd] + text[rd + 1:]
        else:
            return text
    else:
        while not (text[rd] in englishLetters):
            rd = random.randint(0, len(text) - 1)
        return text[:rd + 1] + text[rd] + text[rd + 1:]


CommonMisspelling = {
    "absence": ["absense", "absentse", "abcense", "absance"],
    "acceptable": ["acceptible"],
    "their": ["there", "they're"],
    "there": ["their", "they're"],
    "they're": ["their", "there"],
    "your": ["you're"],
    "you're": ["your"]
}


# Add a common misspelling
def addCommonMisspell(text):
    for word in CommonMisspelling.keys():
        if word in text:
            return text.replace(word, random.choice(CommonMisspelling[word]))
    return text


In [171]:
def generate_qrels(df, n):
    # Add 'weight' column
    max_views = max(df["views"])
    df["weight"] = (df["views"] / max_views) ** 0.5 * 0.5 + 0.1

    df_sampled = df.sample(n // 2, weights='weight')

    def generate_positive_qrel(document):
        text = document['lyrics']
        rd = random.random()
        if rd < 0.6:
            query = getFirstVersesOfChorus(text, random.randint(1, 2))
        elif rd < 0.9:
            query = getFirstVerses(text, random.randint(1, 2))
        else:
            query = getRandomVerses(text, random.randint(1, 2))

        if random.randint(0, 3) == 0:
            query = addCommonMisspell(query)
        query = typos(query)

        for j in range(len(query)):
            rand = random.randint(0, 50)
            if rand == 0:
                query = invertAdjacentLetters(query)
            elif rand == 1:
                query = removeLetter(query)
            if rand == 2:
                query = doubleLetter(query)

        doc_id = document.name
        return pd.Series([query, doc_id, 1], index=['text', 'doc_id', 'relevance'])

    def generate_negative_qrel(positive_qrel):
        original_doc_id = positive_qrel['doc_id']
        negative_doc_id = original_doc_id
        while negative_doc_id == original_doc_id:
            negative_doc_id = df.sample(1).iloc[0].name

        return pd.Series([positive_qrel['text'], negative_doc_id, 0])

    positive_qrels = df_sampled.apply(generate_positive_qrel, axis=1, result_type='expand')
    negative_qrels = positive_qrels.apply(generate_negative_qrel, axis=1, result_type='broadcast')

    return pd.concat([positive_qrels, negative_qrels]).reset_index(drop=True)

In [196]:
# Save qrels as pickle file
total_qrels = 1000
#total_qrels = 100000
qrels_train = generate_qrels(df, int(total_qrels * 0.9))
torch.save(qrels_train, 'qrels-train.pkl')

qrels_test = generate_qrels(df, int(total_qrels * 0.1))
torch.save(qrels_test, 'qrels-test.pkl')

In [182]:
qrels_train

Unnamed: 0,text,doc_id,relevance
0,Intro: Syn,7865,1
1,"Brand Nubian baby, heere to lip it again And y...",11134,1
2,From a nickel and dime ass nigga,8475,1
3,"Verse 1 (Killa Tay) We pump lugs, and punk thu...",11702,1
4,(I) Fuck with your osul likee ethher,230,1
...,...,...,...
9995,"All in together now (no, now, now) What are yo...",9078,0
9996,"Bling bling, every ttime I come around your ci...",1016,0
9997,Ha Huh mann Im trippiin out right now,2695,0
9998,All the ganggstas they gon' ride to tis,1822,0


# TF-IDF

In [None]:
tfidf_vectorizer = TfidfVectorizer()
tfidf_matrix = tfidf_vectorizer.fit_transform(df_proc['text'])

feature = tfidf_vectorizer.get_feature_names_out()

In [None]:
query = "Take it easy with me, please"
query_vec = tfidf_vectorizer.transform([query])

results = cosine_similarity(tfidf_matrix,query_vec)

for i in results.argsort()[-10:][::-1]:
    print(df.iloc[i,0],"--",df.iloc[i,1])

In [31]:
doc_vector = tfidf_matrix[0].toarray()
#df with words and their tf-idf values
df_tfidf = pd.DataFrame(list(zip(feature, doc_vector.flatten())), columns=['Word', 'TF-IDF'])

df_tfidf = df_tfidf.sort_values(by='TF-IDF', ascending=False)
print(df_tfidf)

            Word    TF-IDF
10769        cam  0.800548
34638      killa  0.588214
56995       sing  0.042899
13152       clap  0.041008
65962        uhh  0.018107
...          ...       ...
24039   foodmart  0.000000
24040  foodstamp  0.000000
24041      fooey  0.000000
24042       foof  0.000000
72003        𝑤𝑎𝑠  0.000000

[72004 rows x 2 columns]


# Word2Vec


In [None]:
# TODO this doesn't work. Remove from final file?
from gensim.models import Word2Vec

print(df_proc.columns)

In [None]:
# Extract the tokenized lyrics as a list of lists
corpus = df_proc['text'].apply(lambda x: x.split()).tolist()

# Train Word2Vec model
w2v_model = Word2Vec(sentences=corpus, vector_size=100, window=5, min_count=1, workers=4)

In [None]:
def average_word_vectors(tokens, model, num_features):
    feature_vector = np.zeros((num_features,), dtype="float32")
    n_words = 0
    for token in tokens:
        if token in model.wv:
            n_words += 1
            feature_vector = np.add(feature_vector, model.wv[token])
    if n_words > 0:
        feature_vector = np.divide(feature_vector, n_words)
    return feature_vector


df_proc['doc_vectors'] = [average_word_vectors(tokens, w2v_model, 100) for tokens in corpus]

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

#find similar songs to the first one.
similarity_matrix = cosine_similarity(df_proc['doc_vectors'].tolist(), [df_proc['doc_vectors'].iloc[0]])
similar_songs_indices = np.argsort(similarity_matrix[:, 0])[::-1]

top_similar_songs = df_proc.iloc[similar_songs_indices[1:10]][['title', 'text']]
print(top_similar_songs)
#i am just trying things out rn

In [None]:
def generate_training_data(corpus, window_size, vocab_size, word_to_index, length_of_corpus, sample=None):
    training_data = []
    training_sample_words = []
    for i, word in enumerate(corpus):

        index_target_word = i
        target_word = word
        context_words = []

        #when target word is the first word
        if i == 0:

            # trgt_word_index:(0), ctxt_word_index:(1,2)
            context_words = [corpus[x] for x in range(i + 1, window_size + 1)]

            #when target word is the last word
        elif i == len(corpus) - 1:

            # trgt_word_index:(9), ctxt_word_index:(8,7), length_of_corpus = 10
            context_words = [corpus[x] for x in range(length_of_corpus - 2, length_of_corpus - 2 - window_size, -1)]

        #When target word is the middle word
        else:

            #Before the middle target word
            before_target_word_index = index_target_word - 1
            for x in range(before_target_word_index, before_target_word_index - window_size, -1):
                if x >= 0:
                    context_words.extend([corpus[x]])

            #After the middle target word
            after_target_word_index = index_target_word + 1
            for x in range(after_target_word_index, after_target_word_index + window_size):
                if x < len(corpus):
                    context_words.extend([corpus[x]])

        trgt_word_vector, ctxt_word_vector = get_one_hot_vectors(target_word, context_words, vocab_size, word_to_index)
        training_data.append([trgt_word_vector, ctxt_word_vector])

        if sample is not None:
            training_sample_words.append([target_word, context_words])

    return training_data, training_sample_words

Based on this code: https://github.com/rahul1728jha/Word2Vec_Implementation/blob/master/Word_2_Vec.ipynb

In [None]:
# Define parameters
window_size = 5
vocab_size = len(your_vocabulary)
word_to_index = your_word_to_index_dict
length_of_corpus = len(corpus)

# Call the function to generate training data
training_data, training_sample_words = generate_word_similarity_training_data(corpus, window_size, vocab_size, word_to_index,
                                                                              length_of_corpus, sample=None)

word2vec_model = Word2Vec(sentences=your_training_data, vector_size=your_vector_size, window=your_window_size, min_count=1, workers=4)

In [None]:
def cosine_similarity_word(word1, word2, model):
    vector1 = model.wv[word1]
    vector2 = model.wv[word2]
    similarity = np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2))
    return similarity

In [None]:
similarity_score = cosine_similarity_word('word1', 'word2', word2vec_model)
print(f"Similarity between 'word1' and 'word2': {similarity_score}")

# monoBERT
Based on [github.com/veneres/ltr-emb-analysis](https://github.com/veneres/ltr-emb-analysis)

In [2]:
batch_size = 4

print(f"{batch_size=}")

num_epochs = 1

print(f"{num_epochs=}")

num_training_steps = int(1e5)

print(f"{num_training_steps=}")

save_checkpoint_after_steps = 1000

print(f"{save_checkpoint_after_steps=}")

docs_dataset_path = "./documents.pkl"
qrels_dataset_path = "./qrels-train.pkl"

output_dir = "out"

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

batch_size=4
num_epochs=1
num_training_steps=100000
save_checkpoint_after_steps=1000


NameError: name 'torch' is not defined

In [None]:
# Load model and tokenizer
model_name = "bert-base-uncased"
model = AutoModelForSequenceClassification.from_pretrained(model_name)


tokenizer = AutoTokenizer.from_pretrained(model_name)

In [None]:
class DatasetLoader(Dataset):
    def __init__(self, tokenizer, documents_path: str, qrels_path: str):
        self.documents = torch.load(documents_path)
        self.qrels = torch.load(qrels_path)
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.qrels)

    def get_labels(self):
        return self.qrels[:, 2]

    def __getitem__(self, idx):
        qrel = self.qrels.iloc[idx]

        query_tokens = qrel[0]
        doc_id = qrel[1]
        relevance = qrel[2]

        rel_docs_tokens = self.documents.loc[self.documents.index == doc_id].iloc[0]["text"]

        #print(f"{query_tokens=}")
        #print(f"{rel_docs_tokens=}")

        tokenized_text = self.tokenizer(
            query_tokens,
            rel_docs_tokens,
            return_tensors="pt",
            padding="max_length",
            truncation=True
        )
        tokenized_text = {k: v[0] for k, v in tokenized_text.items()}
        #tensor_qid = torch.tensor(int(query_id))
        #"query_id": tensor_qid, 
        tensor_did = torch.tensor(int(doc_id))
        return {**tokenized_text, "labels": relevance, "doc_id": tensor_did}

gen = torch.Generator()
gen.manual_seed(2147483647)

train_dataset = DatasetLoader(tokenizer, docs_dataset_path, qrels_dataset_path)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, generator=gen)

In [None]:
optimizer = AdamW(model.parameters(), lr=1e-6)

num_warmup_steps = 1e4

print(f"{num_warmup_steps=}")

lr_scheduler = get_scheduler(
    name="linear", optimizer=optimizer, num_warmup_steps=num_warmup_steps,
    num_training_steps=num_training_steps
)

In [None]:
model.to(device)

model.train()

step_n = 0
window_loss = []
for epoch in range(num_epochs):
    with tqdm(train_dataloader, unit="batch", total=min(num_training_steps, len(train_dataloader))) as tepoch:
        #print(next(iter(tepoch)))

        for batch in tepoch:
            tepoch.set_description(f"Epoch {epoch}")
            batch_forward = {k: v.to(device) for k, v in batch.items() if k not in ["doc_id", "labels"]}
            outputs = model(**batch_forward)
            logits = outputs.logits
            cel_w = nn.CrossEntropyLoss().to(device)
            loss_w = cel_w(logits.to(device), batch["labels"].to(device))
            loss_w.backward()

            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()
            window_loss.append(loss_w.item())
            if len(window_loss) < 100:
                tepoch.set_postfix(loss="---")
            else:
                tepoch.set_postfix(loss=np.mean(window_loss))
                window_loss = window_loss[1:]
            step_n += 1
            if step_n % save_checkpoint_after_steps == 0:
                output_dir_step = output_dir/str(step_n)
                print(f"Saving checkpoint in: {output_dir_step}")
                model.save_pretrained(output_dir_step)

            if step_n == num_training_steps:
                break

output_dir_step = output_dir / str(step_n)
model.save_pretrained(output_dir_step)

# Pipeline

In [ ]:
# Retrieval pipeline
def retrieve(query, df, model, tokenizer, top_k=10):
    query = query.lower()
    query = query.replace(re.compile(r"[^\w\s]"), "", regex=True)
    query = query.rsplit()
    query = " ".join([ps.stem(token) for token in query if token not in stopwords_en])

    # TF-IDF
    query_vector = tfidf_vectorizer.transform([query]).toarray()

    # monoBERT
    for
        query_tokens = tokenizer(query, return_tensors="pt", padding="max_length", truncation=True)
    query_tokens = {k: v[0].to(device) for k, v in query_tokens.items()}

    model.eval()
    with torch.no_grad():
        outputs = model(**query_tokens)
        logits = outputs.logits
        softmax = nn.Softmax(dim=1)
        probs = softmax(logits)
        probs = probs[:, 1]
        probs = probs.cpu().numpy()

    # 

    return top_similar_songs