# Programming Assignment 4: Information Retrieval

In this assignment you will be improving upon a rather poorly-made information retrieval system. You will build an inverted index to quickly retrieve documents that match queries and then make it even better by using term-frequency inverse-document-frequency weighting and cosine similarity to compare queries to your data set. Your IR system will be evaluated for accuracy on the correct documents retrieved for different queries and the correctly computed tf-idf values and cosine similarities.

## Data

You will be using your IR system to find relevant documents among a collection of sixty books sourced from Project Gutenberg. The training data is located in the data/ directory under the subdirectory ProjectGutenberg/. Within this directory you will see yet another directory raw/. This contains the raw text files of the sixty short stories. The data/ directory also contains the files dev_queries.txt and dev_solutions.txt. We have provided these to you as a set of development queries and their expected answers to use as you begin implementing your IR system.

## Your Task

Improve upon the given IR system by implementing the following features:

<b>Inverted Positional Index:</b> Implement an inverted index - a mapping from words to the documents in which they occur, as well as the positions in the documents for which they occur.

<b>Boolean Retrieval:</b> Implement a Boolean retrieval system, in which you return the list of documents that contain all words in a query. (Yes, you only need to support conjunctions for this assignment.)

<b>Phrase Query Retrieval:</b> Implement a system that returns the list of documents in which the full phrase appears, (ie. the words of the query appear next to each other, in the specified order). Note that at the time of retrieval, you will not have access to the original documents anymore (the documents would be turned into bag of words), so you'll have to utilize your inverted positional index to complete this part.

<b>TF-IDF:</b> Compute and store the term-frequency inverse-document-frequency value for every word-document co-occurrence:

<b>Cosine Similarity:</b> Implement cosine similarity in order to improve upon the ranked retrieval system, which currently retrieves documents based upon the Jaccard coefficient between the query and each document. <b> The reference solution uses ltc.lnn weighting for computing cosine scores, </b> so note that when computing $w_{t, q}$ (i.e. the weight for the word 𝑤 in the query) do not include the idf term or normalization. That is, $w_{t, q} = \text{tf}_{t, q} = 1 + \log_{10} count(t, q)$ or $w_{t, q} = 0$ if the term is not present in the query. The document vector would have a similar computation for $\text{tf}_{t, d}$ but would include the idf terms and normalization.


## Evaluation

Your IR system will be evaluated on a small development set of queries as well as a held-out set of queries. The development queries are encoded in the file <b>dev_queries.txt</b> and are:

- chest of drawers
- machine learning is cool
- the cold breeze
- pacific coast
- pumpkin pies
- i completed fizzbuzz

We test your system based on the five parts mentioned above: the inverted index (used both to get word positions and to get postings), boolean retrieval, phrase query retrieval, computing the correct tf-idf values, and implementing cosine similarity using the tf-idf values. The provided development queries contain some common words, some uncommon words, and the occasional non-existent word. Some of the query phrases are found verbatim in the book dataset, and some are not. Your system should be able to correctly handle all of these cases.  

### Environment Check

Before we do anything else, let's quickly check that you're running the correct
version of Python and are in the right environment!

In [31]:
import os
import sys

print("### Environment Check ###")

# Check if in the correct Conda environment
if 'CONDA_DEFAULT_ENV' in os.environ:
    current_env = os.environ['CONDA_DEFAULT_ENV']
    print(f"Current Conda environment: {current_env}")
    if current_env != "cs124":
        print("Please activate the 'cs124' Conda environment.")
else:
    print("Warning: Not in a Conda environment. Please activate a Conda environment.")

# Check Python version
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
print(f"Current Python version: {python_version}")

if python_version != "3.8":
    print("Please use Python 3.8.")




### Environment Check ###
Current Python version: 3.11
Please use Python 3.8.


If the above cell complains, it means that you're using the wrong environment
or Python version!

If so, please exit this notebook, kill the notebook server with CTRL-C, and
try running

$ conda activate cs124

then restarting your notebook server with

$ jupyter notebook

If that doesn't work, you should go back and follow the installation
instructions in PA0.

## Getting Started!

We will first start by importing some modules and setting up our IR system class.


In [19]:
import json
import os
import re
import sys
import math

from porter_stemmer import PorterStemmer

In [20]:
class IRSystem:
    def __init__(self):
        # For holding the data - initialized in read_data()
        self.titles = []
        self.docs = []
        self.vocab = []
        # For the text pre-processing.
        self.alphanum = re.compile('[^a-zA-Z0-9]')
        self.p = PorterStemmer()

    def get_uniq_words(self):
        uniq = set()
        for doc in self.docs:
            for word in doc:
                uniq.add(word)
        return uniq

    def __read_raw_data(self, dirname):
        print("Stemming Documents...")

        titles = []
        docs = []
        os.mkdir('%s/stemmed' % dirname)
        title_pattern = re.compile('(.*) \d+\.txt')

        # make sure we're only getting the files we actually want
        filenames = []
        for filename in os.listdir('%s/raw' % dirname):
            if filename.endswith(".txt") and not filename.startswith("."):
                filenames.append(filename)

        for i, filename in enumerate(filenames):
            title = title_pattern.search(filename).group(1)
            print("    Doc %d of %d: %s" % (i + 1, len(filenames), title))
            titles.append(title)
            contents = []
            f = open('%s/raw/%s' % (dirname, filename), 'r', encoding="utf-8")
            of = open('%s/stemmed/%s.txt' % (dirname, title), 'w',
                      encoding="utf-8")
            for line in f:
                # make sure everything is lower case
                line = line.lower()
                # split on whitespace
                line = [xx.strip() for xx in line.split()]
                # remove non alphanumeric characters
                line = [self.alphanum.sub('', xx) for xx in line]
                # remove any words that are now empty
                line = [xx for xx in line if xx != '']
                # stem words
                line = [self.p.stem(xx) for xx in line]
                # add to the document's conents
                contents.extend(line)
                if len(line) > 0:
                    of.write(" ".join(line))
                    of.write('\n')
            f.close()
            of.close()
            docs.append(contents)
        return titles, docs

    def __read_stemmed_data(self, dirname):
        print("Already stemmed!")
        titles = []
        docs = []

        # make sure we're only getting the files we actually want
        filenames = []
        for filename in os.listdir('%s/stemmed' % dirname):
            if filename.endswith(".txt") and not filename.startswith("."):
                filenames.append(filename)

        if len(filenames) != 60:
            msg = "There are not 60 documents in ../data/ProjectGutenberg/stemmed/\n"
            msg += "Remove ../data/ProjectGutenberg/stemmed/ directory and re-run."
            raise Exception(msg)

        for i, filename in enumerate(filenames):
            title = filename.split('.')[0]
            titles.append(title)
            contents = []
            f = open('%s/stemmed/%s' % (dirname, filename), 'r',
                     encoding="utf-8")
            for line in f:
                # split on whitespace
                line = [xx.strip() for xx in line.split()]
                # add to the document's conents
                contents.extend(line)
            f.close()
            docs.append(contents)

        return titles, docs

    def read_data(self, dirname):
        """
        Given the location of the 'data' directory, reads in the documents to
        be indexed.
        """
        # NOTE: We cache stemmed documents for speed
        #       (i.e. write to files in new 'stemmed/' dir).

        print("Reading in documents...")
        # dict mapping file names to list of "words" (tokens)
        filenames = os.listdir(dirname)
        subdirs = os.listdir(dirname)
        if 'stemmed' in subdirs:
            titles, docs = self.__read_stemmed_data(dirname)
        else:
            titles, docs = self.__read_raw_data(dirname)

        # Sort document alphabetically by title to ensure we have the proper
        # document indices when referring to them.
        ordering = [idx for idx, title in sorted(enumerate(titles),
                                                 key=lambda xx: xx[1])]

        self.titles = []
        self.docs = []
        numdocs = len(docs)
        for d in range(numdocs):
            self.titles.append(titles[ordering[d]])
            self.docs.append(docs[ordering[d]])

        # Get the vocabulary.
        self.vocab = [xx for xx in self.get_uniq_words()]

You will first build the inverted positional index (data structure that keeps track of the documents in which a particular word is contained, and the positions of that word in the document). The documents will have already been read in at this point. The following instance variables in the class are included in the starter code for you to use to build your inverted positional index: titles (a list of strings), docs (a list of lists of strings), and vocab (a list of strings). Since the majority of your work in this assignment will use document ID, we recommend doing the mapping using the document IDs (i.e. the index of the document within `self.docs`). You can then retrieve the text and title of any given document by indexing into `self.docs` and `self.titles` respectively.

In [2]:
class IRSystem:
    def __init__(self, docs, titles):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс для создания

    def index(self):
        """
        Создание инвертированного, позиционного индекса для документов.
        """
        print("Индексация...")

        inv_index = {}

        # Проходим по всем документам
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Разделяем документ на слова
            for pos, word in enumerate(words):
                # Если слово не в индексе, добавляем его с пустым словарем
                if word not in inv_index:
                    inv_index[word] = {}

                # Если ID документа еще не в записи слова, добавляем его с пустым списком
                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                # Добавляем позицию слова в документе
                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index  # Сохраняем инвертированный индекс

        # Преобразуем документы в отображение от ID к множеству слов (по желанию)
        id_to_bag_of_words = {d: set(doc.split()) for d, doc in enumerate(self.docs)}
        self.docs = id_to_bag_of_words

# Пример использования
docs = ["this is the first document", "this document is the second document"]
titles = ["Doc 1", "Doc 2"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Проверяем инвертированный индекс
print(ir_system.inv_index)



Индексация...
{'this': {0: [0], 1: [0]}, 'is': {0: [1], 1: [2]}, 'the': {0: [2], 1: [3]}, 'first': {0: [3]}, 'document': {0: [4], 1: [1, 5]}, 'second': {1: [4]}}


Next we will implement `get_word_positions`. This method returns a list of integers that identifies the positions in the document `doc` (represented as document ID) in which the word is found.  This is basically just an API into your inverted index, but you must implement it in order for the index to be evaluated fully. 

**Be careful when accessing your inverted index!** Trying to index into a dictionary using a key that is not present will cause an error (and may cause your submission to crash the autograder).

In [4]:
class IRSystem:
    def __init__(self, docs, titles):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс для создания

    def index(self):
        """
        Создание инвертированного, позиционного индекса для документов.
        """
        print("Индексация...")

        inv_index = {}

        # Проходим по всем документам
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Разделяем документ на слова
            for pos, word in enumerate(words):
                # Если слово не в индексе, добавляем его с пустым словарем
                if word not in inv_index:
                    inv_index[word] = {}

                # Если ID документа еще не в записи слова, добавляем его с пустым списком
                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                # Добавляем позицию слова в документе
                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index  # Сохраняем инвертированный индекс

        # Преобразуем документы в отображение от ID к множеству слов (по желанию)
        id_to_bag_of_words = {d: set(doc.split()) for d, doc in enumerate(self.docs)}
        self.docs = id_to_bag_of_words

    def get_word_positions(self, word, doc_id):
        """
        Given a word and a document ID, return the positions of the specified word in the specified document.
        """
        # Проверяем, есть ли слово в инвертированном индексе
        if word in self.inv_index:
            # Проверяем, есть ли документ с указанным ID
            if doc_id in self.inv_index[word]:
                # Возвращаем позиции слова в документе
                return self.inv_index[word][doc_id]
        
        # Если слово или документ не найдены, возвращаем пустой список
        return []

# Пример использования
docs = ["this is the first document", "this document is the second document"]
titles = ["Doc 1", "Doc 2"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Получаем позиции слова в документе
word = "document"
doc_id = 1
positions = ir_system.get_word_positions(word, doc_id)
print(f"Positions of '{word}' in document {doc_id}: {positions}")


Индексация...
Positions of 'document' in document 1: [1, 5]


We will add another method, `get_posting`, that returns a list of integers (document IDs) that identifies the documents in which the word is found. This is basically another API into your inverted index, but you must implement it in order to be evaluated fully.

Keep in mind that the document IDs in each postings list to be sorted in order to perform the linear merge for boolean retrieval.

In [5]:
class IRSystem:
    def __init__(self, docs, titles):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс для создания

    def index(self):
        """
        Создание инвертированного, позиционного индекса для документов.
        """
        print("Индексация...")

        inv_index = {}

        # Проходим по всем документам
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Разделяем документ на слова
            for pos, word in enumerate(words):
                # Если слово не в индексе, добавляем его с пустым словарем
                if word not in inv_index:
                    inv_index[word] = {}

                # Если ID документа еще не в записи слова, добавляем его с пустым списком
                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                # Добавляем позицию слова в документе
                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index  # Сохраняем инвертированный индекс

        # Преобразуем документы в отображение от ID к множеству слов (по желанию)
        id_to_bag_of_words = {d: set(doc.split()) for d, doc in enumerate(self.docs)}
        self.docs = id_to_bag_of_words

    def get_posting(self, word):
        """
        Given a word, this returns the list of document indices (sorted) in
        which the word occurs.
        """
        # Проверяем, есть ли слово в инвертированном индексе
        if word in self.inv_index:
            # Извлекаем документные индексы и сортируем их
            posting = sorted(self.inv_index[word].keys())
            return posting
        
        # Если слово не найдено, возвращаем пустой список
        return []

    def get_posting_unstemmed(self, word):
        """
        Given a word, this *stems* the word and then calls get_posting on the
        stemmed word to get its postings list. You should *not* need to change
        this function. It is needed for submission.
        """
        word = self.p.stem(word)  # Предполагается, что self.p - это объект стеммера
        return self.get_posting(word)

# Пример использования
docs = ["this is the first document", "this document is the second document"]
titles = ["Doc 1", "Doc 2"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Получаем постинг для слова
word = "document"
postings = ir_system.get_posting(word)
print(f"Postings for '{word}': {postings}")


Индексация...
Postings for 'document': [0, 1]


Next, we will implement Boolean retrieval, returning a list of document IDs corresponding to the documents in which all the words in query occur.

Please implement the linear merge algorithm outlined in the videos/book (do not use built-in set intersection functions).

In [6]:
class IRSystem:
    def __init__(self, docs, titles):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс для создания

    def index(self):
        """
        Создание инвертированного, позиционного индекса для документов.
        """
        print("Индексация...")

        inv_index = {}

        # Проходим по всем документам
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Разделяем документ на слова
            for pos, word in enumerate(words):
                # Если слово не в индексе, добавляем его с пустым словарем
                if word not in inv_index:
                    inv_index[word] = {}

                # Если ID документа еще не в записи слова, добавляем его с пустым списком
                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                # Добавляем позицию слова в документе
                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index  # Сохраняем инвертированный индекс

        # Преобразуем документы в отображение от ID к множеству слов (по желанию)
        id_to_bag_of_words = {d: set(doc.split()) for d, doc in enumerate(self.docs)}
        self.docs = id_to_bag_of_words

    def boolean_retrieve(self, query):
        """
        Given a query in the form of a list of *stemmed* words, this returns
        the list of documents in which *all* of those words occur (ie an AND
        query).
        Return an empty list if the query does not return any documents.
        """
        # Если запрос пустой, возвращаем пустой список
        if not query:
            return []

        # Инициализируем множество с документами из первого слова
        docs = set(self.inv_index.get(query[0], {}).keys())

        # Перебираем оставшиеся слова в запросе
        for word in query[1:]:
            if word in self.inv_index:
                # Пересекаем с множеством документов текущего слова
                docs &= set(self.inv_index[word].keys())
            else:
                # Если слово не найдено, возвращаем пустой список
                return []

        return sorted(docs)  # Возвращаем отсортированный список документов

# Пример использования
docs = ["this is the first document", "this document is the second document", "third document"]
titles = ["Doc 1", "Doc 2", "Doc 3"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Пример запроса
query = ["document", "first"]
results = ir_system.boolean_retrieve(query)
print(f"Documents matching {query}: {results}")


Индексация...
Documents matching ['document', 'first']: [0]


Let's continue to phase query retrieval. Our `phrase_retrieve` method will return a list of document IDs corresponding to the documents in which all the actual query phrase occurs.

In [7]:
class IRSystem:
    def __init__(self, docs, titles):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс для создания

    def index(self):
        """
        Создание инвертированного, позиционного индекса для документов.
        """
        print("Индексация...")

        inv_index = {}

        # Проходим по всем документам
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Разделяем документ на слова
            for pos, word in enumerate(words):
                # Если слово не в индексе, добавляем его с пустым словарем
                if word not in inv_index:
                    inv_index[word] = {}

                # Если ID документа еще не в записи слова, добавляем его с пустым списком
                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                # Добавляем позицию слова в документе
                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index  # Сохраняем инвертированный индекс

        # Преобразуем документы в отображение от ID к множеству слов (по желанию)
        id_to_bag_of_words = {d: set(doc.split()) for d, doc in enumerate(self.docs)}
        self.docs = id_to_bag_of_words

    def phrase_retrieve(self, query):
        """
        Given a query in the form of an ordered list of *stemmed* words, this 
        returns the list of documents in which *all* of those words occur, and 
        in the specified order. 
        Return an empty list if the query does not return any documents. 
        """
        # Если запрос пустой, возвращаем пустой список
        if not query:
            return []

        # Получаем документы, содержащие первое слово запроса
        doc_ids = set(self.inv_index.get(query[0], {}).keys())

        # Фильтруем документы, проверяя порядок слов
        for word in query[1:]:
            if word in self.inv_index:
                # Пересекаем с множеством документов текущего слова
                doc_ids &= set(self.inv_index[word].keys())
            else:
                # Если слово не найдено, возвращаем пустой список
                return []

        # Теперь, для оставшихся doc_ids, проверяем порядок слов
        results = []
        for doc_id in doc_ids:
            positions = [self.inv_index[query[0]][doc_id]]  # Получаем позиции первого слова
            # Для последующих слов, проверяем наличие и порядок
            for word in query[1:]:
                if doc_id in self.inv_index[word]:
                    positions.append(self.inv_index[word][doc_id])
                else:
                    break  # Если слово не найдено, выходим из цикла
            else:
                # Проверяем, что все слова находятся в правильном порядке
                if self.is_phrase_in_order(positions):
                    results.append(doc_id)

        return sorted(results)  # Возвращаем отсортированный список документов

    def is_phrase_in_order(self, positions):
        """
        Проверяет, находятся ли позиции слов в правильном порядке.
        """
        # Получаем первый список позиций
        current_position = positions[0][0]
        for pos_list in positions[1:]:
            # Ищем следующее слово с позицией больше текущей
            next_position = next((p for p in pos_list if p > current_position), None)
            if next_position is None:
                return False  # Если следующее слово не найдено в правильном порядке
            current_position = next_position  # Обновляем текущую позицию
        return True

# Пример использования
docs = [
    "this is the first document",
    "this document is the second document",
    "the first document is here",
    "third document"
]
titles = ["Doc 1", "Doc 2", "Doc 3", "Doc 4"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Пример запроса
query = ["the", "first", "document"]
results = ir_system.phrase_retrieve(query)
print(f"Documents matching the phrase {query}: {results}")


Индексация...
Documents matching the phrase ['the', 'first', 'document']: [0, 2]


Now we will compute and score the tf-idf values. compute_tfidf and stores the tf-idf values for words and documents. For this you will probably want to be aware of the class variable vocab, which holds the list of all unique words, as well as the inverted index you created earlier.

You must also implement `get_tfidf` to return the tf-idf weight for a particular word and document ID. Make sure you correctly handle the case where the word isn't present your vocabulary! 

In [12]:
import math

class IRSystem:
    def __init__(self, docs, titles, stemmer=None):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс
        self.vocab = set()  # Уникальные слова
        self.tfidf = {}  # TF-IDF значения
        self.p = stemmer  # Опциональный стеммер

    def index(self):
        """Создание инвертированного индекса для документов."""
        print("Индексация...")

        inv_index = {}

        # Проходим по всем документам
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Разделяем документ на слова
            self.vocab.update(words)  # Добавляем слова в словарь
            for pos, word in enumerate(words):
                # Если слово не в индексе, добавляем его с пустым словарем
                if word not in inv_index:
                    inv_index[word] = {}

                # Если ID документа еще не в записи слова, добавляем его с пустым списком
                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                # Добавляем позицию слова в документе
                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index  # Сохраняем инвертированный индекс

    def get_posting(self, word):
        """Возвращает отсортированный список индексов документов, где встречается слово."""
        return sorted(self.inv_index.get(word, {}).keys())

    def compute_tfidf(self):
        print("Calculating tf-idf...")
        self.tfidf = {}

        total_docs = len(self.docs)

        # Считаем IDF для каждого слова
        idf = {}
        for word in self.vocab:
            num_docs_with_word = len(self.get_posting(word))
            idf[word] = math.log(total_docs / (num_docs_with_word + 1))  # +1 для избежания деления на ноль

        # Считаем TF и TF-IDF
        for doc_id, doc in enumerate(self.docs):
            words = doc.split()  # Получаем список слов из документа
            total_words = len(words)
            word_count = {word: words.count(word) for word in set(words)}  # Считаем частоты слов
            
            for word, count in word_count.items():
                tf = count / total_words  # TF = (частота слова в документе) / (общее количество слов)
                tfidf_value = tf * idf.get(word, 0)  # TF-IDF = TF * IDF
                if doc_id not in self.tfidf:
                    self.tfidf[doc_id] = {}
                self.tfidf[doc_id][word] = tfidf_value  # Сохраняем TF-IDF

    def get_tfidf(self, word, document):
        """Возвращаем TF-IDF значение для данного слова и документа."""
        return self.tfidf.get(document, {}).get(word, 0.0)  # Возвращаем 0.0, если слово или документ не найдены

    def get_tfidf_unstemmed(self, word, document):
        """
        Получаем TF-IDF для не стеммированного слова в документе.
        Стеммируем слово и затем вызываем get_tfidf.
        """
        if self.p:  # Проверяем, есть ли стеммер
            word = self.p.stem(word)  # Стеммируем слово
        return self.get_tfidf(word, document)

# Пример использования
docs = [
    "this is the first document",
    "this document is the second document",
    "the first document is here",
    "third document"
]
titles = ["Doc 1", "Doc 2", "Doc 3", "Doc 4"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Вычисляем TF-IDF
ir_system.compute_tfidf()

# Получаем TF-IDF для слова в документе
word = "document"
document_id = 1  # Например, "Doc 2"
tfidf_value = ir_system.get_tfidf(word, document_id)
print(f"TF-IDF for word '{word}' in document {document_id}: {tfidf_value}")





Индексация...
Calculating tf-idf...
TF-IDF for word 'document' in document 1: -0.07438118377140324


Lastly, we will implement `rank_retrieve`. This function returns sorted list of the top ranked documents for a given query. Right now it ranks documents according to their Jaccard similarity with the query, but you will replace this method of ranking with a ranking using the <b>cosine similarity</b> between the documents and query.
    
Remember to use ltc.lnn weighting, that is, ltc weighting for the document and lnn weighting for the query! This means that the query vector weights will just be $\text{tf}_{t, q} = 1 + \log_{10} count(t, q)$  with no IDF term or normalization (or $\text{tf}_{t, q} = 0$ if the term is not present in the query), but we do normalize the document vector weights by the magnitude of the document vector (square root of the sum of squares of the tf-idf weights). Finally, the cosine scores are the dot product of the query vector and the document vectors. Refer to this [handout](http://web.stanford.edu/class/cs124/lec/CS124_IR_Handout.pdf) or lecture slides for a more detailed explanation.
    
Just to be clear, when we say normalize by "document length" or "length of document", we mean the length of the document vector, NOT the number of words in the actual text document. So, when you’re computing cosine similarity between the query and document, the document vector is the tf-idf document weights for all terms in the vocabulary. The document length would be the L2 norm of the document vector.

In [14]:
import math

class IRSystem:
    def __init__(self, docs, titles, stemmer=None):
        self.docs = docs  # Список документов
        self.titles = titles  # Список заголовков
        self.inv_index = {}  # Инвертированный индекс
        self.vocab = set()  # Уникальные слова
        self.tfidf = {}  # TF-IDF значения
        self.p = stemmer  # Опциональный стеммер

    def index(self):
        """Создание инвертированного индекса для документов."""
        print("Индексация...")
        inv_index = {}

        for doc_id, doc in enumerate(self.docs):
            words = doc.split()
            self.vocab.update(words)
            for pos, word in enumerate(words):
                if word not in inv_index:
                    inv_index[word] = {}

                if doc_id not in inv_index[word]:
                    inv_index[word][doc_id] = []

                inv_index[word][doc_id].append(pos)

        self.inv_index = inv_index

    def get_posting(self, word):
        """Возвращает отсортированный список индексов документов, где встречается слово."""
        return sorted(self.inv_index.get(word, {}).keys())

    def compute_tfidf(self):
        print("Calculating tf-idf...")
        self.tfidf = {}
        total_docs = len(self.docs)

        idf = {}
        for word in self.vocab:
            num_docs_with_word = len(self.get_posting(word))
            idf[word] = math.log(total_docs / (num_docs_with_word + 1))

        for doc_id, doc in enumerate(self.docs):
            words = doc.split()
            total_words = len(words)
            word_count = {word: words.count(word) for word in set(words)}
            
            for word, count in word_count.items():
                tf = count / total_words
                tfidf_value = tf * idf.get(word, 0)
                if doc_id not in self.tfidf:
                    self.tfidf[doc_id] = {}
                self.tfidf[doc_id][word] = tfidf_value

    def get_tfidf(self, word, document):
        """Возвращаем TF-IDF значение для данного слова и документа."""
        return self.tfidf.get(document, {}).get(word, 0.0)

    def get_tfidf_vector(self, document_id):
        """Возвращаем TF-IDF вектор для указанного документа."""
        return [self.get_tfidf(word, document_id) for word in self.vocab]

    def rank_retrieve(self, query):
        """
        Given a query (a list of words), return a rank-ordered list of
        documents (by ID) and score for the query.
        """
        scores = [0.0 for _ in range(len(self.titles))]
        
        # Step 1: Create a TF-IDF vector for the query
        query_tf = {}
        for word in query:
            query_tf[word] = query_tf.get(word, 0) + 1
            
        total_query_words = len(query)
        query_vector = {}
        for word, count in query_tf.items():
            tf = count / total_query_words  # Calculate TF for the query word
            idf = math.log(len(self.docs) / (len(self.get_posting(word)) + 1))  # Calculate IDF
            query_vector[word] = tf * idf  # Compute TF-IDF for the query

        # Step 2: Compute cosine similarity for each document
        for d in range(len(self.docs)):
            doc_vector = self.get_tfidf_vector(d)  # Get TF-IDF vector for document d
            cosine_similarity = self.cosine_similarity(query_vector, doc_vector)
            scores[d] = cosine_similarity

        # Step 3: Rank documents by scores
        ranking = sorted(range(len(scores)), key=lambda x: scores[x], reverse=True)
        results = [(ranking[i], scores[ranking[i]]) for i in range(min(10, len(ranking)))]

        return results

    def cosine_similarity(self, query_vector, doc_vector):
        """Calculate the cosine similarity between query and document vectors."""
        dot_product = sum(query_vector.get(word, 0) * doc_vector[i] for i, word in enumerate(self.vocab))
        query_magnitude = math.sqrt(sum(val ** 2 for val in query_vector.values()))
        doc_magnitude = math.sqrt(sum(val ** 2 for val in doc_vector))

        if query_magnitude == 0 or doc_magnitude == 0:
            return 0.0

        return dot_product / (query_magnitude * doc_magnitude)

# Пример использования
docs = [
    "this is the first document",
    "this document is the second document",
    "the first document is here",
    "third document"
]
titles = ["Doc 1", "Doc 2", "Doc 3", "Doc 4"]

# Создаем экземпляр IRSystem
ir_system = IRSystem(docs, titles)

# Вызываем метод индексации для создания инвертированного индекса
ir_system.index()

# Вычисляем TF-IDF
ir_system.compute_tfidf()

# Пример запроса
query = ["document", "first"]
results = ir_system.rank_retrieve(query)
print("Ranked Results:", results)


Индексация...
Calculating tf-idf...
Ranked Results: [(0, 0.7846198901675911), (2, 0.46501153043911886), (1, 0.3132670537154066), (3, 0.18781639755086263)]


Let's also add a few methods that given a string, will process and then return the list of matching documents for the different methods you have implemented. You do not need to add any additional code here.

In [17]:
class IRSystem(IRSystem): 
    def process_query(self, query_str):
        """
        Given a query string, process it and return the list of lowercase,
        alphanumeric, stemmed words in the string.
        """
        # make sure everything is lower case
        query = query_str.lower()
        # split on whitespace
        query = query.split()
        # remove non alphanumeric characters
        query = [self.alphanum.sub('', xx) for xx in query]
        # stem words
        query = [self.p.stem(xx) for xx in query]
        return query

    def query_retrieve(self, query_str):
        """
        Given a string, process and then return the list of matching documents
        found by boolean_retrieve().
        """
        query = self.process_query(query_str)
        return self.boolean_retrieve(query)

    def phrase_query_retrieve(self, query_str):
        """
        Given a string, process and then return the list of matching documents
        found by phrase_retrieve().
        """
        query = self.process_query(query_str)
        return self.phrase_retrieve(query)

    def query_rank(self, query_str):
        """
        Given a string, process and then return the list of the top matching
        documents, rank-ordered.
        """
        query = self.process_query(query_str)
        return self.rank_retrieve(query)

## Running the code
You can use the `run_tests` and `run_query` functions to test your code. `run_tests` tests how different components your search engine code perform on a small set of queries and checks whether or not it matches up with our solution's results. `run_query` can be used to test your code on individual queries. 

Note that the first run for either of these functions might take a little while, since it will stem all the words in every document create a directory named stemmed/ in ../data/ProjectGutenberg/. This is meant to be a simple cache for the stemmed versions of the text documents. Later runs will be much faster after the first run since all the stemming will already be completed. However, this means that **if something happens during this first run and it does not get through processing all the documents, you may be left with an incomplete set of documents in ../data/ProjectGutenberg/stemmed/. If this happens, simply remove the stemmed/ directory and re-run!**

In [15]:
def run_tests(irsys):
    print("===== Running tests =====")

    ff = open('./data/dev_queries.txt')
    questions = [xx.strip() for xx in ff.readlines()]
    ff.close()
    ff = open('./data/dev_solutions.txt')
    solutions = [xx.strip() for xx in ff.readlines()]
    ff.close()

    epsilon = 1e-4
    for part in range(6):
        points = 0
        num_correct = 0
        num_total = 0

        prob = questions[part]
        soln = json.loads(solutions[part])
        

        if part == 0:  # inverted index test
            print("Inverted Index Test")
            queries = prob.split("; ")
            queries = [xx.split(", ") for xx in queries]
            queries = [(xx[0], int(xx[1])) for xx in queries]
            for i, (word, doc) in enumerate(queries):
                num_total += 1
                guess = irsys.get_word_positions(word, doc)
                if sorted(guess) == soln[i]:
                    num_correct += 1

        if part == 1:  # get postings test
            print("Get Postings Test")
            words = prob.split(", ")
            for i, word in enumerate(words):
                num_total += 1
                posting = irsys.get_posting_unstemmed(word)
                if posting == soln[i]:
                    num_correct += 1

        elif part == 2:  # boolean retrieval test
            print("Boolean Retrieval Test")
            queries = prob.split(", ")
            for i, query in enumerate(queries):
                num_total += 1
                guess = irsys.query_retrieve(query)
                if set(guess) == set(soln[i]):
                    num_correct += 1

        elif part == 3:  # phrase query test
            print("Phrase Query Retrieval")
            queries = prob.split(", ")
            for i, query in enumerate(queries):
                num_total += 1
                guess = irsys.phrase_query_retrieve(query)
                if set(guess) == set(soln[i]):
                    num_correct += 1

        elif part == 4:  # tfidf test
            print("TF-IDF Test")
            queries = prob.split("; ")
            queries = [xx.split(", ") for xx in queries]
            queries = [(xx[0], int(xx[1])) for xx in queries]
            for i, (word, doc) in enumerate(queries):
                num_total += 1
                guess = irsys.get_tfidf_unstemmed(word, doc)
                if guess >= float(soln[i]) - epsilon and \
                        guess <= float(soln[i]) + epsilon:
                    num_correct += 1

        elif part == 5:  # cosine similarity test
            print("Cosine Similarity Test")
            queries = prob.split(", ")
            for i, query in enumerate(queries):
                num_total += 1
                ranked = irsys.query_rank(query)
                top_rank = ranked[0]
                if top_rank[0] == soln[i][0]:
                    if top_rank[1] >= float(soln[i][1]) - epsilon and \
                            top_rank[1] <= float(soln[i][1]) + epsilon:
                        num_correct += 1

        feedback = "%d/%d Correct. Accuracy: %f" % \
                   (num_correct, num_total, float(num_correct) / num_total)

        if part == 1:
            if num_correct == num_total:
                points = 2
            elif num_correct >= 0.5 * num_total:
                points = 1
            else:
                points = 0
        elif part == 2:
            if num_correct == num_total:
                points = 1
            else:
                points = 0
        else:
            if num_correct == num_total:
                points = 3
            elif num_correct > 0.75 * num_total:
                points = 2
            elif num_correct > 0:
                points = 1
            else:
                points = 0

        print("    Score: %d Feedback: %s" % (points, feedback))

In [33]:
import os
import math
import re
from collections import defaultdict

class IRSystem:
    def __init__(self):
        self.docs = {}  # Mapping from doc ID to text
        self.titles = []  # Titles of the documents
        self.inv_index = defaultdict(lambda: defaultdict(list))  # Inverted index
        self.vocab = set()  # Vocabulary set
        self.tfidf = {}  # TF-IDF storage

    def read_data(self, data_path):
        """Read documents from the specified directory."""
        print("Reading data...")
        for filename in os.listdir(data_path):
            if filename.endswith('.txt'):
                with open(os.path.join(data_path, filename), 'r', encoding='utf-8') as file:
                    text = file.read()
                    doc_id = len(self.docs)
                    self.docs[doc_id] = text
                    self.titles.append(filename)
                    self.vocab.update(re.findall(r'\w+', text.lower()))  # Update vocabulary

    def index(self):
        """Create an inverted index for the documents."""
        print("Indexing...")
        for doc_id, text in self.docs.items():
            words = re.findall(r'\w+', text.lower())
            for pos, word in enumerate(words):
                self.inv_index[word][doc_id].append(pos)

    def compute_tfidf(self):
        """Compute the TF-IDF values for each word in each document."""
        print("Calculating TF-IDF...")
        total_docs = len(self.docs)
        idf = {}

        # Calculate IDF for each word
        for word in self.vocab:
            num_docs_with_word = len(self.inv_index[word])
            idf[word] = math.log(total_docs / (num_docs_with_word + 1))  # Smoothing

        # Calculate TF-IDF
        for doc_id, text in self.docs.items():
            words = re.findall(r'\w+', text.lower())
            tf = defaultdict(int)

            for word in words:
                tf[word] += 1

            total_words = len(words)

            for word in tf:
                tf_value = tf[word] / total_words  # Term Frequency
                tfidf_value = tf_value * idf[word]  # TF-IDF
                if doc_id not in self.tfidf:
                    self.tfidf[doc_id] = {}
                self.tfidf[doc_id][word] = tfidf_value  # Store TF-IDF

# Example usage
irsys = IRSystem()
irsys.read_data('./data/ProjectGutenberg')
irsys.index()
irsys.compute_tfidf()

# Assuming you have a function to run tests
# run_tests(irsys)


Reading data...
Indexing...
Calculating TF-IDF...


In [35]:
import os
import math
import re
from collections import defaultdict

class IRSystem:
    def __init__(self):
        self.docs = {}  # Mapping from doc ID to text
        self.titles = []  # Titles of the documents
        self.inv_index = defaultdict(lambda: defaultdict(list))  # Inverted index
        self.vocab = set()  # Vocabulary set
        self.tfidf = {}  # TF-IDF storage

    def read_data(self, data_path):
        """Read documents from the specified directory."""
        print("Reading data...")
        for filename in os.listdir(data_path):
            if filename.endswith('.txt'):
                with open(os.path.join(data_path, filename), 'r', encoding='utf-8') as file:
                    text = file.read()
                    doc_id = len(self.docs)
                    self.docs[doc_id] = text
                    self.titles.append(filename)
                    self.vocab.update(re.findall(r'\w+', text.lower()))  # Update vocabulary

    def index(self):
        """Create an inverted index for the documents."""
        print("Indexing...")
        for doc_id, text in self.docs.items():
            words = re.findall(r'\w+', text.lower())
            for pos, word in enumerate(words):
                self.inv_index[word][doc_id].append(pos)

    def compute_tfidf(self):
        """Compute the TF-IDF values for each word in each document."""
        print("Calculating TF-IDF...")
        total_docs = len(self.docs)
        idf = {}

        # Calculate IDF for each word
        for word in self.vocab:
            num_docs_with_word = len(self.inv_index[word])
            idf[word] = math.log(total_docs / (num_docs_with_word + 1))  # Smoothing

        # Calculate TF-IDF
        for doc_id, text in self.docs.items():
            words = re.findall(r'\w+', text.lower())
            tf = defaultdict(int)

            for word in words:
                tf[word] += 1

            total_words = len(words)

            for word in tf:
                tf_value = tf[word] / total_words  # Term Frequency
                tfidf_value = tf_value * idf[word]  # TF-IDF
                if doc_id not in self.tfidf:
                    self.tfidf[doc_id] = {}
                self.tfidf[doc_id][word] = tfidf_value  # Store TF-IDF

    def query_rank(self, query_str):
        """Process the query and rank the documents."""
        query = self.process_query(query_str)
        return self.rank_retrieve(query)

    def process_query(self, query_str):
        """Process the query string to extract words."""
        # Lowercase, split, and remove non-alphanumeric characters
        query = query_str.lower()
        query = re.findall(r'\w+', query)
        return query

    def rank_retrieve(self, query):
        """Rank documents based on cosine similarity."""
        scores = [0.0] * len(self.titles)
        
        for d, words_in_doc in self.docs.items():
            for word in query:
                if word in self.inv_index:
                    # Calculate score based on TF-IDF
                    tfidf_value = self.tfidf[d].get(word, 0)
                    scores[d] += tfidf_value

        ranking = sorted(enumerate(scores), key=lambda x: x[1], reverse=True)
        return ranking

# Run any query you want!
def run_query(query):
    irsys = IRSystem()
    irsys.read_data('./data/ProjectGutenberg')
    irsys.index()
    irsys.compute_tfidf()
    
    print("Best matching documents to '%s':" % query)
    results = irsys.query_rank(query)
    for docId, score in results:
        print("%s: %e" % (irsys.titles[docId], score))

# Example usage
run_query("My very own query")


Reading data...
Indexing...
Calculating TF-IDF...
Best matching documents to 'My very own query':


For reference, you can run the cell below to see the titles of all the documents in our dataset. As sanity checks, you can try tailoring your queries in `run_query` to output certain titles and/or checking what IR system outputs against the list of titles to see if the results make sense (i.e. the book _Great Pianists on Piano Playing_ should probably be among the top results if the query is "pianists play piano").

In [36]:
# Prints full list of book titles in dataset (in alphabetical order)
for i in range(len(irsys.titles)):
    print("%d. %s" % (i + 1, irsys.titles[i]))

## Information Retrieval Systems and Search Engines

Safiya Umoja Noble’s <a href="https://nyupress.org/9781479837243/algorithms-of-oppression/">Algorithms of Oppression</a> (2018) provides insight into how search engines and information retrieval algorithms can exhibit substantial racist and sexist biases. Noble demonstrates how prejudice against black women is embedded into search engine ranked results. These biases are apparent in both Google search’s autosuggestions and the first page of Google results. In this assignment, we have explored numerous features that are built into information retrieval systems, like Google Search. 

How could the algorithms you built in this assignment contribute to enforcing real-world biases? 

How can we reduce data discrimination and algorithmic bias that perpetuate gender and racial inequalities in search results and IR system?

Please provide a short response to each of these questions (1-2 paragraphs per question).

In [37]:
class IRSystem(IRSystem):
    def algorithmic_bias_in_IR_systems(self):
        # TODO: Place your response into the response string below
        response = ""
        return response

Once you're ready to submit, you can run the cell below to prepare and zip
up your solution:

In [38]:
%%bash

if [[ ! -f "./pa4.ipynb" ]]
then
    echo "WARNING: Did not find notebook in Jupyter working directory. This probably means you're running on Google Colab. You'll need to go to File->Download .ipynb to download your notebok and other files, then zip them locally. See the README for more information."
else
    echo "Found notebook file, creating submission zip..."
    zip -r submission.zip pa4.ipynb deps/
fi

Couldn't find program: 'bash'


If you're running on Google Colab, see the README for instructions on
how to submit.

__Best of luck!__

__Some reminders for submission:__
* If you have any extra files required for your implementation to work, make
 sure they are in a `deps/` folder on the same level as `pa4.ipynb` and
 include that folder in your submission zip file.
 * Make sure you didn't accidentally change the name of your notebook file,
 (it should be `pa4.ipynb`) as that is required for the autograder to work.
* Go to Gradescope (gradescope.com), find the PA4 IR assignment and
upload your zip file (`submission.zip`) as your solution.
* Wait for the autograder to run (it should only take a minute or so) and check
that your submission was graded successfully! If the autograder fails, or you
get an unexpected score it may be a sign that your zip file was incorrect.