# Imports

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
%matplotlib inline
from tqdm import tqdm
from typing import List,Dict
from IPython.display import Image
from IPython.core.display import HTML 
from pathlib import Path

In [2]:
from nltk.tokenize import word_tokenize 
from nltk.stem.porter import PorterStemmer
import nltk
nltk.download("stopwords")
nltk.download("punkt")
from string import punctuation, ascii_lowercase
from nltk.corpus import stopwords

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\USER\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\USER\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!


# Config

In [12]:
INPUT_FILE_PATH = Path("lyrics.csv")
BOW_PATH = Path("bow.csv")
N_ROWS =  None
tqdm_n_iterations =  363*10**3//CHUNCK_SIZE + 1
COLS = [5]

In [13]:
stemmer = PorterStemmer()
stop_words = set(stopwords.words('english'))
allowed_symbols = set(l for l in ascii_lowercase)

In [None]:
def preprocess_sentence(sentence : str) -> List[str]:
    output_sentence = []
    for word in word_tokenize(sentence):
        ### YOUR CODE HERE
        word=word.lower()
        if word not in stop_words:
            word=''.join([i for i in word if i in allowed_symbols])
            word=stemmer.stem(word)
            if len(word)>1:
                output_sentence.append(word)
        
        
        
        ### END YOUR CODE
    return output_sentence
    

def get_data_chuncks() -> List[str]:
    for i ,chunck in enumerate(pd.read_csv(INPUT_FILE_PATH, usecols = COLS, chunksize = CHUNCK_SIZE, nrows = N_ROWS)):
        chunck = chunck.values.tolist()
        yield [chunck[i][0] for i in range(len(chunck))] 

class TfIdf:
    def __init__(self):
        self.unigram_count =  Counter()
        self.bigram_count = Counter()
        self.trigram_count = Counter()
        self.document_term_frequency = Counter()
        self.word_document_frequency = {}
        self.inverted_index = {}
        self.doc_norms = {}
        self.n_docs = -1
        self.sentence_preprocesser = preprocess_sentence
        self.bow_path = BOW_PATH

    def update_counts_and_probabilities(self, sentence :List[str],document_id:int) -> None:
        sentence_len = len(sentence)
        self.document_term_frequency[document_id] = sentence_len
        for i,word in enumerate(sentence):
            ### YOUR CODE HERE
            self.unigram_count.update([word])
            if i>0: self.bigram_count.update({(sentence[i-1],word):1})
            if i>1: self.trigram_count.update({(sentence[i-2],sentence[i-1],word):1})
            if not word in self.inverted_index: self.inverted_index[word]={}
            self.inverted_index[word][document_id]=self.inverted_index[word].get(document_id,0)+1
            ### END YOUR CODE


    def fit(self) -> None:
        for chunck in tqdm(get_data_chuncks(), total = tqdm_n_iterations):
            for sentence in chunck: #sentence is a song (string)
                self.n_docs += 1 
                if not isinstance(sentence, str):
                    continue
                sentence = self.sentence_preprocesser(sentence)
                if sentence:
                    self.update_counts_and_probabilities(sentence,self.n_docs)
        self.save_bow() # bow is 'bag of words'
        self.compute_word_document_frequency()
        self.update_inverted_index_with_tf_idf_and_compute_document_norm()
             
    def compute_word_document_frequency(self):
        for word in self.inverted_index.keys():
            ### YOUR CODE HERE
            self.word_document_frequency[word]=len(self.inverted_index[word])
            ### END YOUR CODE
            
    def update_inverted_index_with_tf_idf_and_compute_document_norm(self):
        ### YOUR CODE HERE
        for word in self.inverted_index.keys():
            for doc in self.inverted_index[word].keys():
                self.inverted_index[word][doc]=self.inverted_index[word][doc]/self.document_term_frequency[doc]*np.log10(self.n_docs/self.word_document_frequency[word])
                self.doc_norms[doc]=self.doc_norms.get(doc,0)+np.square(self.inverted_index[word][doc])

        
        ### END YOUR CODE
        for doc in self.doc_norms.keys():
            self.doc_norms[doc] = np.sqrt(self.doc_norms[doc]) 
            
    def save_bow(self):
        pd.DataFrame([self.inverted_index]).T.to_csv(self.bow_path)
                
tf_idf = TfIdf()
tf_idf.fit()

 49%|████████████████████████████████████████▍                                         | 36/73 [17:38<17:05, 27.71s/it]

 DocumentRetriever


In [15]:
class DocumentRetriever:
    def __init__(self, tf_idf):
        self.sentence_preprocesser = preprocess_sentence  
        self.vocab = set(tf_idf.unigram_count.keys())
        self.n_docs = tf_idf.n_docs
        self.inverted_index = tf_idf.inverted_index
        self.word_document_frequency = tf_idf.word_document_frequency
        self.doc_norms = tf_idf.doc_norms
        
    def rank(self,query : Dict[str,int],documents: Dict[str,Counter],metric: str ) -> Dict[str, float]:
        result = {} # key: DocID , value : float , simmilarity to query
        query_len = np.sum(np.array(list(query.values())))
        ### YOUR CODE HERE
        for word in query:
            query[word]=query[word]/query_len*np.log10(self.n_docs/self.word_document_frequency[word])    
            for doc in documents[word]:
                result[doc]=result.get(doc,0)+query[word]*self.inverted_index[word][doc]
            
            
        
         ### END YOUR CODE
        if metric == 'cosine':
            ### YOUR CODE HERE
            for doc in result.keys():
                result[doc]=result[doc]/self.doc_norms[doc]
            ### END YOUR CODE
        return result
        
    
    def sort_and_retrieve_k_best(self, scores: Dict[str, float],k :int):
        ### YOUR CODE HERE 
        return [t[0] for t in Counter(scores).most_common(k)]
        ### END YOUR CODE

    
    def reduce_query_to_counts(self, query : List)->  Counter:
        ### YOUR CODE HERE
        return Counter(query)
        ### END YOUR CODE
        
        
    def get_top_k_documents(self,query : str, metric: str , k = 5) -> List[str]:
        query = self.sentence_preprocesser(query)
        query = [word for word in query if word in self.vocab] # filter nan 
        query_bow = self.reduce_query_to_counts(query)
        relavant_documents = {word : self.inverted_index.get(word) for word in query}
        ducuments_with_similarity = self.rank(query_bow,relavant_documents, metric)
        return self.sort_and_retrieve_k_best(ducuments_with_similarity,k)
        
dr = DocumentRetriever(tf_idf)

In [22]:
from IPython.display import HTML
query = "Better stop dreaming of the quiet life, 'cause it's the one we'll never know And quit running for that runaway bus 'cause those rosy days are few And stop apologizing for the things you've never done 'Cause time is short and life is cruel but it's up to us to change This town called malice"
HTML('<iframe width="560" height="315" src="https://www.youtube.com/embed/KT6ZtUbVw1M?rel=0&amp;controls=0&amp;showinfo=0" frameborder="0" allowfullscreen></iframe>')

In [None]:
for index, song in enumerate(pd.read_csv(INPUT_FILE_PATH,usecols = [5]).iloc[cosine_top_k]['lyrics']):
    sep = "#"*50
    print(F"{sep}\nsong #{index} \n{song} \n{sep}")

In [None]:
cosine_top_k = dr.get_top_k_documents(query, 'cosine')
print(cosine_top_k)
inner_product_top_k = dr.get_top_k_documents(query, 'inner_product')
print(inner_product_top_k)