# Music Retrieval - A Boolean Retrieval Approach

In this notebook a solution for the retrieval of songs based on boolean queries is presented.

In [1]:
# Load libraries
## Python version is 3.11.6
import pandas as pd
pd.set_option('display.max_columns', None)
import nltk
nltk.download('wordnet')
nltk.download('stopwords')
import string
from functools import total_ordering
import re
import pickle

[nltk_data] Downloading package wordnet to /home/akasnipe/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/akasnipe/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


The dataset contains lyrics of songs in the English language, from 1950 to 2019.

In [2]:
data = pd.read_csv('data/spotify_millsongdata.csv', sep=",")
data.head(5)

Unnamed: 0,artist,song,link,text
0,ABBA,Ahe's My Kind Of Girl,/a/abba/ahes+my+kind+of+girl_20598417.html,"Look at her face, it's a wonderful face \nAnd..."
1,ABBA,"Andante, Andante",/a/abba/andante+andante_20002708.html,"Take it easy with me, please \nTouch me gentl..."
2,ABBA,As Good As New,/a/abba/as+good+as+new_20003033.html,I'll never know why I had to go \nWhy I had t...
3,ABBA,Bang,/a/abba/bang_20598415.html,Making somebody happy is a question of give an...
4,ABBA,Bang-A-Boomerang,/a/abba/bang+a+boomerang_20002668.html,Making somebody happy is a question of give an...


Only `artist`, `song`  and `text` will be used for the retrieval.

In [3]:
data = data[['artist', 'song', 'text']]
data.head(5)

Unnamed: 0,artist,song,text
0,ABBA,Ahe's My Kind Of Girl,"Look at her face, it's a wonderful face \nAnd..."
1,ABBA,"Andante, Andante","Take it easy with me, please \nTouch me gentl..."
2,ABBA,As Good As New,I'll never know why I had to go \nWhy I had t...
3,ABBA,Bang,Making somebody happy is a question of give an...
4,ABBA,Bang-A-Boomerang,Making somebody happy is a question of give an...


### IR System

The system will be composed of the following classes:
* Posting: The class to implement the Posting objects,
* Posting List: The class to implement the Posting List objects,
* Term: The class to implement the Term objects,
* Inverted Index: The class to implement the Inverted Index objects for Boolean Retrieval,
* Song: The class to implement the Song objects,
* IR System: The "main" class that puts everything together.

In [4]:
# Posting class

@total_ordering
class Posting:
    
    # Initializer, takes a document ID as an argument.
    def __init__(self, docID):
        self._docID = docID
    
    # Retrieve a document's contents from a corpus using the document ID.
    def get_from_corpus(self, corpus):
        return corpus[self._docID]
    
    # Check equality with another Posting, based on document ID.
    def __eq__(self, other):
        return self._docID == other._docID
    
    # Check if this Posting has document ID greater than another Posting.
    def __gt__(self, other):
        return self._docID > other._docID
    
    # Provide the string representation of the Posting.
    def __repr__(self):
        return str(self._docID)

In [5]:
# Posting List class

class PostingList:

    # Initializer, initializes an empty list of postings.
    def __init__(self):
        self._postings = []
    
    # Create a PostingList instance with a single Posting from a document ID.
    @classmethod
    def from_docID(cls, docID):
        posting_list = cls()
        posting_list._postings = [(Posting(docID))]
        return posting_list
    
    # Create a PostingList instance from an existing list of Postings.
    @classmethod
    def from_posting_list(cls, postingList):
        plist = cls()
        plist._postings = postingList
        return plist

    # Merge another PostingList into this one, avoiding duplicates.
    def merge(self, other):
        i = 0  # Index for the other PostingList.
        last = self._postings[-1]  # The last Posting in the current list.

        while (i < len(other._postings) and last == other._postings[i]):
            i += 1  # Increment the index if a duplicate is found.
        self._postings += other._postings[i:]  # Append the non-duplicate postings from the other list.
    
    # Compute the intersection of this PostingList with another.
    def intersection(self, other):
        intersection = []  # Start with an empty list for the intersection.
        i = 0  # Index for this PostingList.
        j = 0  # Index for the other PostingList.

        while (i < len(self._postings) and j < len(other._postings)): # Loop until one of the lists is exhausted.
            if (self._postings[i] == other._postings[j]):
                intersection.append(self._postings[i]) # If both postings are equal, add to the intersection.
                i += 1 # Increment both indexes.
                j += 1
            # If postings are different, increment the index for the list with the smallest value.
            elif (self._postings[i] < other._postings[j]):
                i += 1
            else:
                j += 1
        return PostingList.from_posting_list(intersection)  # Return a new PostingList of the intersection.

    # Compute the union of this PostingList with another.
    def union(self, other):
        union = []  # Start with an empty list for the union.
        i = 0  # Index for this PostingList.
        j = 0  # Index for the other PostingList.
        while (i < len(self._postings) and j < len(other._postings)): # Loop until one of the lists is exhausted.
            if (self._postings[i] == other._postings[j]):
                union.append(self._postings[i]) # If both postings are equal, add one to the union.
                i += 1 # Increment both indexes.
                j += 1
            # If postings are different, add the posting with the smallest value to the union and increment its index.
            elif (self._postings[i] < other._postings[j]):
                union.append(self._postings[i])
                i += 1
            else:
                union.append(other._postings[j])
                j += 1
        # Add any remaining postings from both lists to the union.
        for k in range(i, len(self._postings)):
            union.append(self._postings[k])
        for k in range(j, len(other._postings)):
            union.append(other._postings[k])
        return PostingList.from_posting_list(union)  # Return a new PostingList of the union.
    
    # Retrieve the contents of each Posting from a corpus.
    def get_from_corpus(self, corpus):
        return list(map(lambda x: x.get_from_corpus(corpus), self._postings))
    
    # Provide the string representation of the PostingList.
    def __repr__(self):
        return ", ".join(map(str, self._postings))

In [6]:
# Term class

# Exception class for handling merge operation errors.
class ImpossibleMergeError(Exception):
    pass

# A class that represents a term in a document, along with its posting list.
@total_ordering
class Term:

    # Initializer, takes a term and a document ID as arguments.
    def __init__(self, term, docID):
        self.term = term
        # Initialize posting_list for the term with a PostingList created from the given document ID.
        self.posting_list = PostingList.from_docID(docID)

    # Merge another Term's posting list into this one if they have the same term.
    def merge(self, other):
        if (self.term == other.term):
            self.posting_list.merge(other.posting_list)
        else:
            raise ImpossibleMergeError
    
    # Check equality with another Term.
    def __eq__(self, other):
        return self.term == other.term
    
    # Determine if this Term is greater than another.
    def __gt__(self, other):
        return self.term > other.term
    
    # Provide the string representation of the Term.
    def __repr__(self):
        return self.term + ": " + repr(self.posting_list)

Before defining the Inverted Index class, let's define functions to perform normalization, stemming and lemmatization.

In [7]:
# Stop Word removal, Normalization and Stemming/Lemmatization

def remove_stop_words(text):
     
    # Start from a list containing the tokens in "text"
    text_list = text.split()

    # Filter out stop words
    text_list = [word for word in text_list if word not in set(nltk.corpus.stopwords.words('english'))]

    # Join the remaining words into a single string
    result = " ".join(text_list)

    return result

def normalize(text):

    # Make a translation table that maps all punctuation characters to None
    translator = str.maketrans("", "", string.punctuation)

    # Apply the translation table to the input string
    result = text.translate(translator)

    # Converts the text to lowercase.
    result = result.lower()

    return result

def stem(text, type='porter'):
        
    # Start from a list containing the tokens in "text"
    stemmed_text = text.split()

    # Create a stemmer object
    if type == 'porter':
        stemmer = nltk.stem.porter.PorterStemmer()
    elif type == 'snowball':
        stemmer = nltk.stem.snowball.SnowballStemmer("english")
    else:
        raise ValueError('Stemmer type not supported')

    # Loop through each word in the text and retrieve the stem
    for i in range(len(stemmed_text)):
        stemmed_text[i] = stemmer.stem(stemmed_text[i])

    # Join the stemmed words into a single string
    result = " ".join(stemmed_text)

    return result

def lemmatize(text):
     # Start from a list containing the tokens in "text"
        lemmatized_text = text.split()
    
        # Create a lemmatizer object
        lemmatizer = nltk.stem.WordNetLemmatizer()
    
        # Loop through each word in the text and retrieve the lemma
        for i in range(len(lemmatized_text)):
            lemmatized_text[i] = lemmatizer.lemmatize(lemmatized_text[i])
    
        # Join the lemmatized words into a single string
        result = " ".join(lemmatized_text)
    
        return result

In [8]:
# Inverted Index class

class InvertedIndex:
    
    # Initialize the inverted index with an empty dictionary.
    def __init__(self):
        self._dictionary = []
        
    # Create an inverted index from a corpus of documents
    ## Argument word_reduction_type enables to choose between stemming and lemmatization
    ## Argument stop_words enables to maintain stop words (stop_words=True) or remove them (stop_words=False)
    @classmethod
    def from_corpus(cls, corpus, word_reduction_type = 'stemming_porter', stop_words = True):
        intermediate_dict = {}  # Intermediate dictionary to store the terms and their postings.
        for docID, song in enumerate(corpus):
            # Remove stop words, normalize and stem/lemmatize
            document = song.lyrics
            if not stop_words:
                document = remove_stop_words(document)
            document = normalize(document)
            if word_reduction_type == 'stemming_porter':
                document = stem(document, type = 'porter')
            elif word_reduction_type == 'stemming_snowball':
                document = stem(document, type = 'snowball')
            elif word_reduction_type == 'lemmatization':
                document = lemmatize(document)
            tokens = list(document.split()) # Tokenize the document into individual words.
            biwords = [tokens[i]+' '+tokens[i+1] for i in range(len(tokens)-1)] # Get all biwords in the document.
            for token in tokens:
                term = Term(token, docID) # Create a new term with the token and the current document ID.
                try: # Try to merge the term with existing one in the intermediate dictionary.
                    intermediate_dict[token].merge(term)
                except KeyError: # If the term is not already in the dictionary, add it.
                    intermediate_dict[token] = term
            for biword in biwords:
                term = Term(biword, docID) # Create a new term with the biword and the current document ID.
                try: # Try to merge the term with existing one in the intermediate dictionary.
                    intermediate_dict[biword].merge(term)
                except KeyError: # If the term is not already in the dictionary, add it.
                    intermediate_dict[biword] = term
        idx = cls() # Create a new InvertedIndex instance.
        idx._dictionary = sorted(intermediate_dict.values(), key=lambda term: term.term) # Sort the terms in the intermediate dictionary and store them in the index's dictionary.
        return idx
    
    # Retrieve the posting list for a given term.
    def __getitem__(self, key):
        for term in self._dictionary:
            if term.term == key: # If the term matches the key, return its posting list.
                return term.posting_list
        raise KeyError("No song matches the given query.") # If the term is not in the dictionary, raise a KeyError.
    
    # Provide a string representation of the inverted index.
    def __repr__(self):
        return "A dictionary with " + str(len(self._dictionary)) + " terms"

In [9]:
# Song class

# Class to hold the title, author, genre, topic and lyrics of a song
class Song:
    
    # Initializer, initializes the title, author, genre, topic and lyrics attributes.
    def __init__(self, title, author, lyrics):
        self.title = title
        self.author = author
        self.lyrics = lyrics
        
    # Provide the string representation of the Song object.
    def __repr__(self):
        return "Title: " + self.title + ",\nAuthor: " + self.author + "\n\n"
    
# Get song author, title and lyrics from data
def get_songs_data(path):
    data = pd.read_csv(path, sep=",")
    # Remove newline characters from song lyrics
    data['text'] = data['text'].replace('\r\n',' ', regex=True)
    corpus = []
    for index, item in data.iterrows():
        song = Song(title = item['song'],
                    author = item['artist'],
                    lyrics = item['text'])
        # Add the Song object to the corpus.
        corpus.append(song)
    # Return the populated list of MovieDescription objects.
    return corpus


In [10]:
# Information Retrieval (IR) system class

class IRsystem:

    # Initialize the IR system with a corpus and the inverted index.   
    def __init__(self, corpus, index):
        self._corpus = corpus
        self._index = index
    
    # Create an IR system instance from a given corpus.
    @classmethod
    def from_corpus(cls, corpus, word_reduction_type = 'stemming_porter', stop_words=True):
        index = InvertedIndex.from_corpus(corpus, word_reduction_type, stop_words)
        return cls(corpus, index)
    
    # Return the posting list of a given posting
    def get_posting_list(self, posting):
        # Retrieve the posting list from the index.
        posting_list = self._index[posting]
        # Return the list of documents.
        return posting_list.get_from_corpus(self._corpus)

In [11]:
# Function to execute a text query against an IR system.

def query(ir, query, word_reduction_type = 'stemming_porter'):
    answer = set()
    # Split the text query into individual words/biwords.
    words = re.split('(AND|OR|NOT)', query)
    for i in range(len(words)):
        words[i] = words[i].strip()
    # Check if the first or the last word is a boolean operator and return an error.
    if words[0] in ["AND", "OR", "NOT"] or words[len(words)-1] in ["AND", "OR", "NOT"]:
        raise KeyError("The first and the last word of the query cannot be a boolean operator.")
    # Normalize and stem/lemmatize the query words/biwords but not the boolean operators.
    for i in range(len(words)):
        if words[i] not in ["AND", "OR", "NOT"]:
            if word_reduction_type == 'stemming_porter':                   
                words[i] = stem(normalize(words[i]), type = 'porter')
            elif word_reduction_type == 'stemming_snowball':
                words[i] = stem(normalize(words[i]), type = 'snowball')
            elif word_reduction_type == 'lemmatization':
                words[i] = lemmatize(normalize(words[i]))
    # Retrieve the posting list for the first word/biword from the index.
    result = ir.get_posting_list(words[0])
    for song in result:
        answer.add(song)
    # Loop through the remaining words in the query.
    for i in range(1, len(words), 2):
        # Retrieve the posting lists for the next word from the index.
        result = ir.get_posting_list(words[i+1])
        # Case AND: Intersect the current answer with the new posting lists.
        if words[i] == "AND":
            answer = answer.intersection(result)
        # Case OR: Union the current answer with the new posting lists.
        elif words[i] == "OR":
            answer = answer.union(result)
        # Case NOT: Subtract the new posting lists from the current answer.
        elif words[i] == "NOT":
            answer = answer.difference(result)
    # Print out each song that matches the query.
    ## If no song matches the query, print out a message.
    if len(answer) == 0:
        raise KeyError("No song matches the given query.")
    for song in answer:
        print(song)

Let's test the Boolean Retrieval System

In [17]:
corpus = get_songs_data("data/spotify_millsongdata.csv")

ir = IRsystem.from_corpus(corpus)

In [24]:
# Save the IR system to a file
with open('IRSystem/irsystem.pkl', 'wb') as output:
    pickle.dump(ir, output, pickle.HIGHEST_PROTOCOL)

In [12]:
# Load the IR system from a file
with open('IRSystem/irsystem.pkl', 'rb') as input:
    ir = pickle.load(input)

In [13]:
query(ir, "american AND idiot")

Title: Irresponsible Hate Anthem,
Author: Marilyn Manson


Title: American Idiot (Greenday Cover),
Author: Avril Lavigne


Title: Raleigh Soliloquy Pt. Ii,
Author: Sublime


Title: American Idiot,
Author: Green Day




In [14]:
query(ir, "american AND idiot NOT media")

Title: Irresponsible Hate Anthem,
Author: Marilyn Manson


Title: Raleigh Soliloquy Pt. Ii,
Author: Sublime




In [15]:
query(ir, "american AND idiot OR punchline")

Title: Man Machine,
Author: Robbie Williams


Title: New Years Eve,
Author: Eagles


Title: It's Time,
Author: Elvis Costello


Title: Raleigh Soliloquy Pt. Ii,
Author: Sublime


Title: American Idiot,
Author: Green Day


Title: Dancing On A High Wire,
Author: Alan Parsons Project


Title: On The Nickel,
Author: Tom Waits


Title: I'm So Sick Of You,
Author: Weird Al Yankovic


Title: Throw It Up,
Author: Yelawolf


Title: American Idiot (Greenday Cover),
Author: Avril Lavigne


Title: Main Chick,
Author: Chris Brown


Title: Irresponsible Hate Anthem,
Author: Marilyn Manson


Title: Waiting For The Punchline,
Author: Extreme


Title: Liar's Bar,
Author: Beautiful South


Title: Up In Flames,
Author: Nicki Minaj


