In [1]:
import os
import shutil

import math
from collections import defaultdict, Counter
from typing import List
import sys

In [2]:
class Singleton:
    # Private variable to hold the unique instance of the class
    _instance = None
 
    """
        Checks if the instance already exists and returns the existing instance.
        If the instance does not exist, it creates a new instance and returns it.
    """
    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(Singleton, cls).__new__(cls, *args, **kwargs)
            cls._instance._index = None
        return cls._instance
        
def create_folder(folder_name: str) -> None :
    """ Create a folder called "folder_name" """
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
        
 # scrivo su disco, sul file "file_name" il contenuto della struttura dati "struct"
def write_to_block(file_name: str, struct: defaultdict) -> None:
    """ Write to the disk, to the file 'file_name', the content of the data structure 'struct'."""
    with open(file_name, "a") as f:
        for index, term in enumerate(struct.keys()):
            f.write(struct[term].to_string())

            if index != len(struct.keys()) - 1:
                f.write("\n")

        f.write("\n")

In [3]:
class DocumentIndexRow:
    def __init__(self, doc_no: int, text: str) -> None:
        '''
            This constructor receives a document number and the content of the document, and save
            the first parameter and the length of the second (that represents the document length).
        '''
        if not isinstance(doc_no, int) or not isinstance(text, str):
            raise ValueError("doc_no must be an integer and text must be a string.")
        
        self.doc_id = doc_no
        self.document_length = self.count_words(text)

    def count_words(self, text: str):
        """Receives a document and counts how many tokens it contains."""
        if not isinstance(text, str):
            raise ValueError("text must be a string.")

        # empty string
        if not text.strip():
            return 0
            
        return len(text.split())
    
    def to_string(self):    
        """ Join in a string the content of the row. """
        string = ' '.join([str(self.doc_id) , str(self.document_length)])
        return string  

class DocumentIndex(Singleton):
    def __init__(self):
        if self._index is None:
            self._index = defaultdict(DocumentIndexRow)
            self.number_of_documents = 0 
            self.total_document_length = 0

    def add_document(self, doc_id: int, text: str) -> None:
        """Adds a document to the document index."""
        if not isinstance(doc_id, int) or not isinstance(text, str):
            raise ValueError("doc_id must be an integer and text must be a string.")
            
        if (self.get_document(doc_id)==None):
            self._index[doc_id]=[]
        row = DocumentIndexRow(doc_id,text)
        self._index[doc_id] = row

        # Update the statistics about total number of documents in the document index and total document length
        self.number_of_documents = self.number_of_documents + 1 
        self.total_document_length = self.total_document_length + row.document_length
             
    def get_document(self, doc_id: int) -> List[DocumentIndexRow]:
        """Fetches a row from the document index"""
        if not isinstance(doc_id, int):
            raise ValueError("doc_id must be an integer.")
            
        if (doc_id in self._index):
            return self._index[doc_id]
        return None
    
    def is_empty(self) -> bool:
        """Check if there are no documents in the document index."""
        return len(self.get_document_ids()) == 0
    
    def get_document_ids(self) -> List[str]:
        """Returns all unique document IDs in the index."""
        return list(self._index.keys()) 
    
    def clear_structure(self):
        """ It clears the document index data structure."""
        self._index.clear()
        self.number_of_documents = 0
        self.total_document_length = 0
    
    def get_structure(self):
        """Returns the document index data structure."""
        return self._index

In [4]:
def compute_max_term_frequency(postings_list: str) -> int:
    """
    Given a postings list of a term, compute the maximum term frequency.

    Args:
        postings_list: A string containing elements with colon-separated values.
    """
    if not isinstance(postings_list, str):
        raise ValueError("Invalid postings list.")

    if len(postings_list) == 0:
        return 0
        
    # Split the postings list into individual elements
    postings_elements = postings_list.split()

    # Initialize the maximum value with the value from the first element
    max = int(postings_elements[0].split(':')[1])

    # Iterate through each element and find the maximum value after the colon
    for item in postings_elements:
        # Split each element to extract the value after the colon
        parts = item.split(':')
        value = int(parts[1])
        
        # Compare the extracted value with the current maximum value and update if necessary
        if value > max:
            max = value

    return max


def compute_TFIDF(tf: int, idf: float) -> float:
    """
    Compute the TF-IDF value based on the term frequency (tf) and inverse document frequency (idf).
    
    Args:
        tf: An integer representing the term frequency.
        idf: A float representing the inverse document frequency.
    """
    if not isinstance(tf, int) or not isinstance(idf, float):
        raise ValueError("Invalid parameters.")

    if tf < 0:
        return 0
        
    return (1 + math.log(tf)) * idf

def compute_IDFT(number_of_documents: int, dft:int) -> float:
    """
    Compute the inverse document frequency for a term based on the document index (doc_index) and document frequency (dft).

    Args:
        doc_index: An instance of DocumentIndex representing the document index.
        dft: The document frequency of the term.
    """
    if not isinstance(number_of_documents, int) or not isinstance(dft, int):
        raise ValueError("Invalid parameters.")

    if dft < 0 or number_of_documents < 0:
        return 0
    
    return math.log(number_of_documents/dft)  

def compute_avgDL(doc_index: DocumentIndex) -> float:
    """
    Compute the average document length based on the document index (doc_index).

    Args:
        doc_index: An instance of DocumentIndex representing the document index.
    """
    if not isinstance(doc_index, DocumentIndex):
        raise ValueError("Invalid parameters.")
        
    return doc_index.total_document_length / doc_index.number_of_documents

# TODO: DA FINIRE, NON SO COME CALCOLARE ALCUNI PARAMETRI
def compute_maxBM25(doc_index: DocumentIndex,idf: float, k1:float = 1.6, b:float = 0.75):
    avgDL = compute_avgDL(doc_index)
    # come calcolo BM25Tf e BM25Dl?
    return #(idf * BM25Tf)/(BM25Tf + k1 * ( (1 - b) + b * (BM25Dl/avgDL) ))


def create_lexicon(file_input_path: str, file_output_path: str, DIR_FOLDER: str, file_extension: str, block_size: int, document_index: DocumentIndex) -> int:
    """
    Function returns a file with one row for each distinct term in the corpus. Rows are composed by:
    term, document frequency, inverse document frequency, term upper bound
    Each values is separated by a comma.

    Args:
        file_input_path: file that contains the inverted index
        file_output_path: file that will contains the result
        DIR_FOLDER: folder that will contains the output file
        file_extension: extension of the file
        block_size: dimension of rows in main memory
    """
    # Check if the input file path exists and is a file
    if not file_input_path or not os.path.exists(file_input_path) or not os.path.isfile(file_input_path):
        raise ValueError("Invalid file_input_path.")

    # Check if the output folder path exists
    if not file_output_path:
        raise ValueError("Invalid file_output_path.")

    # Check if DIR_FOLDER is a non-empty string
    if not DIR_FOLDER or not isinstance(DIR_FOLDER, str):
        raise ValueError("Invalid DIR_FOLDER.")

    # Check if the file extension is a non-empty string
    if not file_extension or not isinstance(file_extension, str):
        raise ValueError("Invalid file_extension.")

    # Check that block_size is a positive integer
    if not isinstance(block_size, int) or block_size <= 0:
        raise ValueError("Invalid block_size. Must be a positive integer.")

    # Check that document_index is an instance of DocumentIndex
    if not document_index or not isinstance(document_index, DocumentIndex):
        raise ValueError("Invalid document_index. Must be an instance of DocumentIndex.")
        
    try:
        lexicon = Lexicon()
        create_folder(DIR_FOLDER)
        nr_block = 0
        with open(file_input_path, 'r') as file:
            for line in file:
                # term sarà qualcosa tipo "ciao", invece la postings list sarà 3:2 3:3 ecc
                elements = line.split()
                term = elements[0]          
                postings_list = ' '.join(elements[1:])
                
                # il dft si trova facendo la split su spazi e punti e virgola di tutta la posting list
                dft = len(postings_list.split())

                # la term frequency massima
                max_tf = compute_max_term_frequency(postings_list)

                if (sys.getsizeof(lexicon.get_structure()) > block_size):  #Free memory available
                    write_to_block(DIR_FOLDER + file_output_path + str(nr_block) + file_extension, lexicon.get_structure())
                    lexicon.clear_structure()
                    nr_block=nr_block + 1 

                lexicon.add_term(term, dft, document_index, max_tf)

            #Finally, saving the last remaing block.       
            if (not lexicon.is_empty()):   
                write_to_block(DIR_FOLDER + file_output_path + str(nr_block) + file_extension, lexicon.get_structure())

            return 0                
    except IOError as e:
        print(f"Error reading from {file_input_path}: {e}")
        return -1
        
    
class LexiconRow:
    def __init__(self, term: str, dft: int, doc_index: DocumentIndex, max_tf: int):
        if not isinstance(term, str) or not isinstance(doc_index, DocumentIndex):
            raise ValueError("term must be a string and doc_index a DocumentIndex.")
        if not isinstance(dft, int) or not isinstance(max_tf, int):
            raise ValueError("dft and max_tf must be integers.")

        self.term = term

        # Document frequency of the term
        self.dft = dft

        # Inverse of document frequency of the term.              
        self.idft = compute_IDFT(doc_index.number_of_documents, dft)

        # Max term frequency
        self.max_tf = max_tf

        # Max tfidf
        self.maxTFIDF = compute_TFIDF(max_tf, self.idft)

    def to_string(self):
        string = ' '.join([str(self.term) , str(self.dft) , str(self.idft), str(self.max_tf), str(self.maxTFIDF)])
        return string    

class Lexicon(Singleton):
    def __init__(self):
        self._vocabulary = defaultdict(LexiconRow) # oppure "dictionary"??

    def add_term(self, term: str, dft: int, document_index: DocumentIndex, maxTf: int) -> None:
        """Adds a document to the lexicon."""
        if not isinstance(term, str) or not isinstance(dft, int) or not isinstance(document_index, DocumentIndex) or not isinstance(maxTf, int):
            raise ValueError("There's an error in parameter's type.")
            
        # Append new row to the lexicon
        if (self.get_terms(term)==None):
            self._vocabulary[term]=[]
        self._vocabulary[term] = LexiconRow(term, dft, document_index, maxTf)
             
    def get_terms(self, term: str) -> List[LexiconRow]:
        """Fetches a row to the lexicon"""
        if not isinstance(term, str):
            raise ValueError("Term must be a string.")
            
        if (term in self._vocabulary):
            return self._vocabulary[term]
        return None
    
    def is_empty(self)->bool:
        """Check if there is no term in the lexicon."""
        return len(self.get_term())==0
    
    def get_term(self) -> List[str]:
        """Returns all unique terms in the lexicon."""
        return self._vocabulary.keys() 
    
    def clear_structure(self):
        """ It clears the lexicon data structure."""
        self._vocabulary.clear()
    
    def get_structure(self):
        """Returns the lexicon data structure."""
        return self._vocabulary 