In [1]:
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
import os
import pickle



<h1 style='background:#222;color:white;text-align:center'>Off line part</h1>

In [2]:
class FileHandler:
    @staticmethod
    def read_and_print(file_path):
        try:
            with open(file_path, 'r') as file:
                content = file.read()
                print(content)
        except FileNotFoundError:
            print("File not found.")
        except Exception as e:
            print(f"An error occurred: {str(e)}")

    @staticmethod
    def extract_and_save_text(input_file, output_folder, text_prefix=""):
        with open(input_file, 'r') as file:
            lines = file.readlines()

        doc_number = 1
        doc_list = []

        for line in lines:
            if line.startswith(text_prefix):
                file_path = os.path.join(output_folder, f'doc{doc_number}_.txt')
                with open(file_path, 'w') as new_file:
                    new_file.write(line[len(text_prefix):])  # Write the content after the text prefix
                doc_list.append(file_path)
                doc_number += 1

        return doc_list

"""# Example usage:
input_file_path = 'sample.txt'
output_folder_path = 'docs/'
text_identifier = 'TEXT:'

resulting_docs = FileHandler().extract_and_save_text(input_file_path, output_folder_path, text_identifier)
print("Documents created:", resulting_docs)"""


'# Example usage:\ninput_file_path = \'sample.txt\'\noutput_folder_path = \'docs/\'\ntext_identifier = \'TEXT:\'\n\nresulting_docs = FileHandler().extract_and_save_text(input_file_path, output_folder_path, text_identifier)\nprint("Documents created:", resulting_docs)'

In [3]:

class TextPreprocessor:
    def __init__(self, file_path, output_file_name, output_folder_path, tokenize=True, remove_stopwords=True, stemming=True):
        self.file_path = file_path
        self.output_file_name = output_file_name
        self.output_folder_path = output_folder_path
        self.tokenize = tokenize
        self.remove_stopwords = remove_stopwords
        self.stemming = stemming
        #---
        self.text = None
        self.tokens = []  # Initialize as an empty list
        self.filtered_tokens = []  # Initialize as an empty list
        self.stemmed_tokens = []  # Initialize as an empty list
        #---
        self.porter_stemmer = PorterStemmer() if stemming else None  # Initialize stemmer conditionally
        try:
            self.stop_words = set(stopwords.words("english"))
        except LookupError:
            nltk.download('stopwords', quiet=True)
            nltk.download('punkt', quiet=True)
        self.preprocess_text()
    #----------Functions------------
    def read_text(self):
        with open(self.file_path, "r", encoding="utf-8") as file:
            self.text = file.read()

    def tokenize_text(self):
        if self.text is not None:
            self.tokens = word_tokenize(self.text)
            self.tokens = [word.lower() for word in self.tokens if word.isalnum()]

    def remove_stop_words(self):
        if self.tokens:
            self.filtered_tokens = [word for word in self.tokens if word not in self.stop_words]

    def porter_stemming(self):
        if self.stemming and self.filtered_tokens and self.porter_stemmer:  # Check if stemming is enabled
            self.stemmed_tokens = [self.porter_stemmer.stem(word) for word in self.filtered_tokens]
        else:
            self.stemmed_tokens = self.filtered_tokens[:]  # If stemming is disabled, copy the tokens as they are

    def save_processed_text(self):
        if self.stemmed_tokens:  # Check if list is not empty
            processed_text = " ".join(self.stemmed_tokens)
        else:
            processed_text = " ".join(self.filtered_tokens) if self.filtered_tokens else ""
        output_path = os.path.join(self.output_folder_path, self.output_file_name)
        with open(output_path, "w", encoding="utf-8") as file:
            file.write(processed_text)

    def preprocess_text(self):
        self.read_text()
        self.tokenize_text()
        self.remove_stop_words()
        self.porter_stemming()
        self.save_processed_text()

'''# Example usage:
file_path = "./sample.txt"
output_file_name = "collection1.txt"
output_folder_path = "./"
preprocessor = TextPreprocessor(file_path, output_file_name, output_folder_path)'''

'# Example usage:\nfile_path = "./sample.txt"\noutput_file_name = "collection1.txt"\noutput_folder_path = "./"\npreprocessor = TextPreprocessor(file_path, output_file_name, output_folder_path)'

In [4]:


class PositionalInvertedIndex:
    def __init__(self,folder_path,output_folder_path):
        self.folder_path = folder_path
        self.output_folder_path = output_folder_path
        self.index = {}

    def add_term(self, term, document_id, position):
        if term not in self.index:
            self.index[term] = {'doc_freq': 0, 'postings': {}}
        if document_id not in self.index[term]['postings']:
            self.index[term]['postings'][document_id] = []
            self.index[term]['doc_freq'] += 1
        self.index[term]['postings'][document_id].append(position)

    def construct_index(self):
        files = os.listdir(self.folder_path)
        doc_id = 1

        for file in files:
            file_path = os.path.join(self.folder_path, file)
            if os.path.isfile(file_path):
                preprocessor = TextPreprocessor(file_path, f"afterPrDoc{doc_id}_.txt", self.output_folder_path)
                preprocessor.preprocess_text()
                for position, term in enumerate(preprocessor.stemmed_tokens):
                    self.add_term(term, file, position)
                doc_id += 1
                
    def print_index(self):
        for term, postings in self.index.items():
            print(f" \n Term: '{term}'  Document Frequency: {postings['doc_freq']} ")
            print(" Postings:")
            for doc_id, positions in postings['postings'].items():
                print(f"   {doc_id} :")
                print(f"   Positions: {', '.join(str(pos) for pos in positions)}")
            print()
    def save_index_as_pickle(self, file_path):
        with open(file_path, 'wb') as file:
            pickle.dump(self.index, file)       

# Usage example:
index = PositionalInvertedIndex('./docs','./docs/afterProcssDocs')
index.construct_index()

# Accessing the constructed index:
print(index.index)
index.print_index()
index.save_index_as_pickle('./indexes/index.pkl')


{'plan': {'doc_freq': 11, 'postings': {'tweet_50.txt': [0], 'tweet_41.txt': [6], 'paragraph_1.txt': [56, 369, 430, 1593], 'tweet_6.txt': [5], 'paragraph_52.txt': [381, 606, 624], 'paragraph_12.txt': [329], 'tweet_194.txt': [3], '_paragraph_48.txt': [327], '_paragraph_52.txt': [381, 606, 624], '_paragraph_12.txt': [329], 'paragraph_48.txt': [327]}}, 'day': {'doc_freq': 35, 'postings': {'tweet_50.txt': [1], 'tweet_202.txt': [4], 'paragraph_17.txt': [7], 'paragraph_51.txt': [97], '01 (34th copy).txt': [468, 826, 1189], 'tweet_78.txt': [4], 'tweet_120.txt': [4], 'paragraph_2.txt': [278], '01 (copy).txt': [32], 'paragraph_13.txt': [8], 'paragraph_1.txt': [372, 386, 431, 618, 640, 806, 845, 852, 968, 978, 1167, 1592, 1655], '_paragraph_13.txt': [8], '_paragraph_2.txt': [278], '_paragraph_51.txt': [97], 'paragraph_52.txt': [86], 'paragraph_12.txt': [341], 'paragraph_42.txt': [97], 'tweet_194.txt': [2], 'tweet_105.txt': [4], '_paragraph_48.txt': [165], 'tweet_143.txt': [2], 'tweet_75.txt': [4]

<h1 style='background:#222;color:white;text-align:center'>Online part</h1>

In [5]:

class Query:
    def __init__(self, words):
        if isinstance(words, str):
            words = [words]  # Convert a single word string to a list
        self.words = words
        self.stemmer = PorterStemmer()
        self.stop_words = set(stopwords.words("english"))


    
    def remove_stop_words(self):
        if self.words is None:
            return []

        final_words = []
        lower_stop_words = set(map(str.lower, self.stop_words))
        for word in self.words:
            if (word.lower() not in lower_stop_words):
                final_words.append(word)

        return final_words


    def stem_words(self , words):
        if  words is None:
            return []

        return [self.stemmer.stem(word) for word in  words]

    def process_text(self, remove_stopwords=True, stemming=True):

        if remove_stopwords:
            processed = self.remove_stop_words()
             

        if stemming:
            processed = self.stem_words(processed)

        return processed

# Example usage
phrase_terms = ['GAZA', 'TO', 'Egybt']
print(phrase_terms)

query = Query(phrase_terms)
processed_terms = query.process_text(remove_stopwords=True, stemming=True)
print(processed_terms)


['GAZA', 'TO', 'Egybt']
['gaza', 'egybt']


In [6]:
import pickle

class BooleanSearch:
    @staticmethod
    def load_index_from_pickle(file_path):
        with open(file_path, 'rb') as file:
            return pickle.load(file)

    @staticmethod
    def boolean_search(query, positional_index):
        query_terms = query.split()
        and_terms = [term for term in query_terms if term != 'AND' and term != 'OR' and term != 'NOT']
        operators = [term for term in query_terms if term == 'AND' or term == 'OR' or term == 'NOT']
        
        query = Query(and_terms)
        and_terms = query.process_text(remove_stopwords=True, stemming=True)
        
        result = set(positional_index.get(and_terms[0], {}).get('postings', {}).keys())

        for i in range(len(operators)):
            if operators[i] == 'AND':
                term_docs = set(positional_index.get(and_terms[i + 1], {}).get('postings', {}).keys())
                result = result.intersection(term_docs)
            elif operators[i] == 'OR':
                term_docs = set(positional_index.get(and_terms[i + 1], {}).get('postings', {}).keys())
                result = result.union(term_docs)
            elif operators[i] == 'NOT':
                term_docs = set(positional_index.get(and_terms[i + 1], {}).get('postings', {}).keys())
                result = result.difference(term_docs)

        return list(result)


# Usage example:
index_file_path = "./indexes/index.pkl"  # Replace with your .pkl file path
positional_index = BooleanSearch.load_index_from_pickle(index_file_path)

query = "Hamas AND GAZA"
result_docs = BooleanSearch.boolean_search(query, positional_index)
print("Documents matching the query:", result_docs,"\n")
for file in result_docs:
    FileHandler().read_and_print('./docs/'+file)


Documents matching the query: ['paragraph_12.txt', '01 (12th copy).txt', '01 (13th copy).txt', 'paragraph_2.txt', '_paragraph_48.txt', '01 (34th copy).txt', '_paragraph_52.txt', 'paragraph_52.txt', 'paragraph_33.txt', '_paragraph_43.txt', '_paragraph_2.txt', 'paragraph_51.txt', 'paragraph_42.txt', '01 (6th copy).txt', 'paragraph_43.txt', 'tweet_196.txt', '_paragraph_33.txt', 'paragraph_1.txt', '_paragraph_51.txt', 'tweet_143.txt', '_paragraph_12.txt', 'paragraph_29.txt', 'paragraph_48.txt', '_paragraph_29.txt', '01 (8th copy).txt'] 

Paragraph 197 (middle east conflict):
The US secretary of state was speaking in Turkey, on trip that will take him to Jordan, Egypt and Israel US secretary of state Antony Blinken has urged Middle East countries to use their influence over regional actors to ensure the Gaza conflict is contained and prevent “an endless cycle of violence,” as he continued his week-long trip aimed at calming tensions. Blinken was speaking on Saturday, after Lebanon’s Iranian

In [7]:
class PhraseSearch:
    def __init__(self, index_file):
        self.index = self.load_index(index_file)

    def load_index(self, index_file):
        with open(index_file, 'rb') as file:
           return pickle.load(file)

    def phrase_search(self, query):
        and_terms = query.split()
        query = Query(and_terms)
        terms = query.process_text(remove_stopwords=True, stemming=True)

        # Initialize a dictionary to store matching documents and positions
        phrase_positions = {}

        # Get postings for the first term in the query
        first_term = terms[0]
        if first_term in self.index:
            first_term_postings = self.index[first_term]['postings']

            # Loop through each document containing the first term
            for doc, positions in first_term_postings.items():
                # Check for the presence of subsequent terms in order
                pos = positions[0]  # Consider the first position of the first term

                # Check if the remaining terms exist in consecutive positions in the document
                for i in range(1, len(terms)):
                    next_term = terms[i]
                    if doc in self.index.get(next_term, {}).get('postings', {}):
                        next_term_positions = self.index[next_term]['postings'][doc]
                        if pos + i in next_term_positions:
                            pos = pos + i  # Move to the next position in the document
                        else:
                            break  # Move to the next document if positions are not consecutive
                    else:
                        break  # Move to the next document if the term is not found

                else:  # If all terms are found in order in the document
                    if doc not in phrase_positions:
                        phrase_positions[doc] = [pos - len(terms) + 1]
                    else:
                        phrase_positions[doc].append(pos - len(terms) + 1)

        return phrase_positions
# Example usage:
index_file_path = './indexes/index.pkl'  # Replace this with your file path
phrase_searcher = PhraseSearch(index_file_path)

# Perform a phrase search for a query
query = "Palestinian boy"
results = phrase_searcher.phrase_search(query)

# Display results
if results:
    print(f"Phrase '{query}' found in the following positions:")
    for doc, positions in results.items():
         
        
        for pos in positions:
            print(f"- Document: {doc}, Starting Position: {pos}")
            FileHandler().read_and_print('./docs/'+doc)
else:
    print(f"No matches found for the phrase '{query}'")



Phrase 'Palestinian boy' found in the following positions:
- Document: tweet_25.txt, Starting Position: 2
Awni Eldous: The Palestinian boy who found YouTube fame after death
https://www.bbc.co.uk/news/world-middle-east-67788360


In [8]:
import math
from collections import defaultdict
import pickle

class TFIDF:
    def __init__(self, inverted_index_path):
        with open(inverted_index_path, 'rb') as file:
            self.inverted_index = pickle.load(file)
        self.total_docs = len(self.inverted_index[next(iter(self.inverted_index))]['postings'])
        self.document_frequency = {term: self.inverted_index[term]['doc_freq'] for term in self.inverted_index}

    def preprocess_query(self, query):
        query_terms_l = query.split()
        query_c = Query(query_terms_l)  # Assuming Query class isn't defined in the provided code
        processed_query = query_terms_l  # Skipping preprocessing for demonstration
        return processed_query

    def calculate_tfidf(self, query):
        query_terms = self.preprocess_query(query)
        term_frequency = defaultdict(lambda: defaultdict(int))
        idf = {}
        tfidf = defaultdict(float)

        # Calculate term frequency in the query
        query_term_freq = defaultdict(int)
        for term in query_terms:
            query_term_freq[term] += 1

        # Calculate TF-IDF for each term in the query
        for term, query_freq in query_term_freq.items():
            if term in self.inverted_index:
                idf[term] = 1 + math.log10(query_freq) * math.log10(self.total_docs / self.document_frequency[term])
                for doc, positions in self.inverted_index[term]['postings'].items():
                    term_frequency[term][doc] = len(positions)
                    tfidf[doc] += term_frequency[term][doc] * idf[term]

        # Normalize TF-IDF scores
        #for doc in tfidf:
          #  tfidf[doc] /= len(doc.split('_')[0])  # Normalize by the length of the document name

        # Rank documents based on TF-IDF scores
        ranked_docs = sorted(tfidf.items(), key=lambda x: x[1], reverse=True)
        return ranked_docs

# Example usage:
inverted_index_path = './indexes/index.pkl'
tfidf_model = TFIDF(inverted_index_path)

query = "Hamas war"
ranked_documents = tfidf_model.calculate_tfidf(query)
print(ranked_documents, "Ranked Documents based on modified TF-IDF scores:")
i=0
for doc, score in ranked_documents:
    print(f"Document: {doc}, TF-IDF Score: {score}")
    i=i+1
    if (i==10):
        break
    #FileHandler().read_and_print('./docs/'+doc)


[('paragraph_1.txt', 51.0), ('paragraph_2.txt', 8.0), ('_paragraph_2.txt', 8.0), ('_paragraph_48.txt', 6.0), ('paragraph_48.txt', 6.0), ('01 (34th copy).txt', 3.0), ('paragraph_14.txt', 3.0), ('paragraph_33.txt', 3.0), ('_paragraph_33.txt', 3.0), ('_paragraph_14.txt', 3.0), ('paragraph_51.txt', 2.0), ('paragraph_44.txt', 2.0), ('_paragraph_44.txt', 2.0), ('paragraph_13.txt', 2.0), ('paragraph_34.txt', 2.0), ('paragraph_46.txt', 2.0), ('_paragraph_46.txt', 2.0), ('_paragraph_13.txt', 2.0), ('paragraph_41.txt', 2.0), ('_paragraph_51.txt', 2.0), ('paragraph_52.txt', 2.0), ('paragraph_42.txt', 2.0), ('_paragraph_41.txt', 2.0), ('_paragraph_52.txt', 2.0), ('paragraph_36.txt', 2.0), ('tweet_99.txt', 1.0), ('tweet_202.txt', 1.0), ('tweet_96.txt', 1.0), ('tweet_117.txt', 1.0), ('_paragraph_31.txt', 1.0), ('tweet_78.txt', 1.0), ('tweet_1.txt', 1.0), ('tweet_120.txt', 1.0), ('tweet_21.txt', 1.0), ('tweet_186.txt', 1.0), ('tweet_164.txt', 1.0), ('01 (21st copy).txt', 1.0), ('paragraph_31.txt', 1.