In [None]:
from functools import total_ordering, reduce
import re
import csv
from tqdm import tqdm
from BTrees.OOBTree import OOBTree

In [42]:
@total_ordering
class Posting:

    def __init__(self, doc_id: int):
        self.doc_id = doc_id

    def __eq__(self, other):
        return other == self.doc_id

    def __gt__(self, other):
        return self.doc_id > other

    def __repr__(self) -> str:
        return str(self.doc_id)

    def from_corpus(self, corpus):
        return corpus[self.doc_id]

In [43]:
class PostingsList:

    def __init__(self) -> None:
        self._postings_list = []

    @classmethod
    def from_postings_list(cls, postings_list: list[Posting]) -> 'PostingsList':
        plist = cls()
        postings_list.sort()
        plist._postings_list = postings_list
        return plist

    @classmethod
    def from_doc_id(cls, doc_id: int):
        plist = cls()
        plist._postings_list = [Posting(doc_id)]
        return plist

    def merge(self, other: "PostingsList") -> 'PostingsList':
        # A method to merge another PostingList into this one, avoiding duplicates.
        i = 0  # Start index for the other PostingList.
        last = self._postings_list[-1]  # The last Posting in the current list.
        # Loop through the other PostingList and skip duplicates.
        while (i < len(other._postings_list) and last == other._postings_list[i]):
            i += 1  # Increment the index if a duplicate is found.
        # Append the non-duplicate postings from the other list.
        self._postings_list += other._postings_list[i:]
        return self

    def get_from_corpus(self, corpus):
        return list(map(lambda x: x.from_corpus(corpus), self._postings_list))

    def intersection(self, other: "PostingsList") -> 'PostingsList':
        plist = []
        i = 0
        j = 0
        while (i < len(self._postings_list)) and (j < len(other._postings_list)):
            if self._postings_list[i] == other._postings_list[j]:
                plist.append(self._postings_list[i])
                i += 1
                j += 1
            elif self._postings_list[i] <= other._postings_list[j]:
                i += 1
            else:
                j += 1
        return PostingsList.from_postings_list(plist)

    def union(self, other: "PostingsList") -> 'PostingsList':
        plist = []
        i = 0
        j = 0
        while (i < len(self._postings_list)) and (j < len(other._postings_list)):
            if self._postings_list[i] == other._postings_list[j]:
                plist.append(self._postings_list[i])
                i += 1
                j += 1
            elif self._postings_list[i] < other._postings_list[j]:
                plist.append(self._postings_list[i])
                i += 1
            else:
                plist.append(other._postings_list[j])
                j += 1
        if i < len(self._postings_list):
            plist += self._postings_list[i:]
        elif j < len(other._postings_list):
            plist += other._postings_list[j:]
        return PostingsList.from_postings_list(plist)

    def __repr__(self) -> str:
        return ", ".join(map(str, self._postings_list))

In [44]:
class ImpossibleMergeException(Exception):
    pass


@total_ordering
class Term:
    def __init__(self, term: str, doc_id: int) -> None:
        self.term = term
        self.postings_list = PostingsList.from_doc_id(doc_id)

    def merge(self, other: "Term") -> 'Term':
        if self == other:
            self.postings_list.merge(other.postings_list)
        else:
            raise ImpossibleMergeException
        return self

    def __eq__(self, other) -> bool:
        return self.term == other.term

    def __gt__(self, other) -> bool:
        return self.term > other.term

    def __repr__(self) -> str:
        return self.term + ": " + str(self.postings_list)

In [45]:
class TrieNode:

    def __init__(self):
        self.children = dict()
        self.is_leaf = False
        self.postings_list = None

    def set_postings_list(self, postings_list: PostingsList) -> None:
        self.postings_list = postings_list
        self.is_leaf = True

    def __repr__(self) -> str:
        to_return = ""
        if self.is_leaf:
            to_return += ": " + str(self.postings_list) + "\n"
        for key in self.children:
            to_return += str(key) + self.children[key].__repr__()
        return to_return

In [46]:
class MissingKeyException(Exception):
    pass


class Trie:

    def __init__(self) -> None:
        self.root = TrieNode()

    def insert(self, node: Term) -> 'Trie':
        current = self.root
        for char in node.term:
            if char not in current.children:
                current.children[char] = TrieNode()
            current = current.children[char]
        if current.postings_list is not None:
            current.postings_list.merge(node.postings_list)
        else:
            current.set_postings_list(node.postings_list)
        return self

    def search(self, key: str) -> PostingsList:
        current = self.root
        for char in key:
            if char in current.children:
                current = current.children[char]
            else:
                raise MissingKeyException
        if current.postings_list is not None:
            return current.postings_list
        else:
            raise MissingKeyException

    def remove(self, key: str) -> None:
        current = self.root
        for char in key:
            idx = ord(char) - ord('a')
            child = current.children[idx]
            if child is None:
                raise MissingKeyException
            else:
                current = child
        current = None

    def merge(self, other: "Trie"):
        stack = [(self.root, other.root)]
        while stack:
            node_self, node_other = stack.pop()
            if node_other.postings_list is not None:
                if node_self.postings_list is not None:
                    node_self.postings_list.merge(node_other.postings_list)
                else:
                    node_self.set_postings_list(node_other.postings_list)
            for key in node_other.children:
                value = node_other.children[key]
                if key not in node_self.children:
                    node_self.children[key] = value
                else:
                    stack += [(node_self.children[key], value)]

    def __repr__(self) -> str:
        return self.root.__repr__()

In [47]:
def normalize(text):
    # Removes punctuation from the text using a regular expression.
    no_punctuation = re.sub(r'[^\w\s^-]', '', text)
    # Converts the text to lowercase.
    downcase = no_punctuation.lower()
    # Returns the normalized text.
    return downcase


def tokenize(content) -> list:
    normalized = normalize(content)
    return normalized.split()


class InvertedIndex:

    def __init__(self) -> None:
        self.btree = OOBTree()

    @classmethod
    def from_corpus(cls, corpus):
        terms = {}
        # per ogni documento
        for doc_id, content in enumerate(tqdm(corpus)):
            tokens = tokenize(content.description)
            # per ogni parola
            for token in tokens:
                if token in terms:
                    terms[token].merge(PostingsList.from_doc_id(doc_id))
                else:
                    terms[token] = PostingsList.from_doc_id(doc_id)

                # mettilo nel trie
        idx = cls()
        idx.btree.update(terms)
        return idx

    def __getitem__(self, key: str) -> PostingsList:
        # check if empty
        return self.btree[key]

    def __repr__(self) -> str:
        return ""

In [48]:
class MovieDescription:
    def __init__(self, title: str, description: str):
        self.title = title
        self.description = description

    def __repr__(self) -> str:
        return self.title


def read_movie_description(movie_metadata, description_file):
    names = {}
    corpus = []
    with open(movie_metadata, 'r') as file:
        movie_names = csv.reader(file, delimiter='\t')
        for description in movie_names:
            names[description[0]] = description[2]
    with open(description_file, 'r') as file:
        descriptions = csv.reader(file, delimiter='\t')
        for description in descriptions:
            try:
                corpus.append(MovieDescription(
                    names[description[0]], description[1]))
            except KeyError:
                pass
    return corpus

In [None]:
class IrSystem:
    def __init__(self, corpus: list[MovieDescription], index: InvertedIndex) -> None:
        self._corpus = corpus
        self._index = index

    @classmethod
    def from_corpus(cls, corpus: list[MovieDescription]) -> 'IrSystem':
        index = InvertedIndex.from_corpus(corpus)
        return cls(corpus, index)

    def optimize_and_query(self, terms: list[str]):
        plist = map(lambda x: self._index[x], terms)
        plist = sorted(plist, key=lambda x: len(x))
        result = reduce(lambda x, y: x.intersection(y), plist)
        return result

    def flatten_and_chains(self, query: str):
        tokens = query.split()
        postfix = infix_to_postfix(tokens)
        stack = []
        for token in postfix:
            if token == 'AND':
                left = stack.pop()
                right = stack.pop()
                if not isinstance(left, list):
                    left = [left]
                if not isinstance(right, list):
                    right = [right]
                stack.append(left + right)
            elif token in ('OR', 'NOT'):
                left = stack.pop()
                right = stack.pop
                if isinstance(left, list):
                    left = self.optimize_and_query(left)
                if isinstance(right, list):
                    right = self.optimize_and_query(right)
                if token == 'OR':
                    stack.append(left.union(right))
                else:
                    stack.append(left.negation(right))
            else:
                stack.append(token)
        result = stack.pop()
        if isinstance(result, list):
            result = self.optimize_and_query(result)
        return result.get_from_corpus(self._corpus)


def infix_to_postfix(tokens):
    output = []
    stack = []

    for token in tokens:
        token = token.upper()
        if token in ('AND', 'OR', 'NOT'):
            while (stack and stack[-1] != '('):
                output.append(stack.pop())
            stack.append(token)
        elif token == '(':
            stack.append(token)
        elif token == ')':
            while stack and stack[-1] != '(':
                output.append(stack.pop())
            stack.pop()  # remove '('
        else:
            output.append(token)

    while stack:
        output.append(stack.pop())
    return output

In [50]:
corpus = read_movie_description(
    '../Code IR/data/movie.metadata.tsv', '../Code IR/data/plot_summaries.txt')

In [51]:
ir = IrSystem.from_corpus(corpus)

100%|██████████| 42204/42204 [00:26<00:00, 1621.25it/s]


In [53]:
ir.flatten_and_chains('luke')

AttributeError: 'str' object has no attribute 'get_from_corpus'