# Assignment 1


---

# Hamdan Abdollah - 2021186

In [None]:
import os
import re
import pickle
from collections import defaultdict
from nltk.stem import PorterStemmer
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext

# Preprocessing

In [None]:
stemmer = PorterStemmer()

In [None]:
def preprocess(text, stopwords):
    text = text.lower() #lowering case
    tokens = re.findall(r'\b\w+\b', text) # tokenize using regex
    tokens = [stemmer.stem(token) for token in tokens if token not in stopwords] # remove stopwords and apply stemming
    return tokens

In [None]:
# making the inverted index
def build_inverted_index(abstracts, stopwords):
    inverted_index = defaultdict(list)
    for doc_id, abstract in abstracts.items():
        tokens = preprocess(abstract, stopwords)
        for position, token in enumerate(tokens):
            inverted_index[token].append((doc_id, position))
    return inverted_index

In [None]:
# making the positional index
def build_positional_index(inverted_index):
    positional_index = defaultdict(dict)
    for term, postings in inverted_index.items():
        for doc_id, position in postings:
            if doc_id not in positional_index[term]:
                positional_index[term][doc_id] = []
            positional_index[term][doc_id].append(position)
    return positional_index

# Saving index and loading from device

In [None]:
def save_indexes(inverted_index, positional_index, filename="indexes.pkl"):
    with open(filename, "wb") as file:
        pickle.dump({"inverted_index": inverted_index, "positional_index": positional_index}, file)

In [None]:
def load_indexes(filename="indexes.pkl"):
    with open(filename, "rb") as file:
        data = pickle.load(file)
        return data["inverted_index"], data["positional_index"]

# Handling of Multiple Terms

In [None]:
# function for AND query for multiple terms
def process_and_query(terms, positional_index):
    if not terms:
        return set()
    result = set(positional_index.get(terms[0], {}).keys()) #documents containing the first term
    # AND with documents containing the remaining terms
    for term in terms[1:]:
        result.intersection_update(set(positional_index.get(term, {}).keys()))
    return result

In [None]:
# function for OR query for multiple terms
def process_or_query(terms, positional_index):
    if not terms:
        return set()
    result = set()
    # union all docs containing any of the terms
    for term in terms:
        result.update(set(positional_index.get(term, {}).keys()))
    return result

In [None]:
# processing NOT Query
def process_not_query(term, positional_index, all_doc_ids):
    doc_ids = set(positional_index.get(term, {}).keys())
    return all_doc_ids - doc_ids

In [None]:
# proximity query
def process_proximity_query(term1, term2, k, positional_index):
    result = set()
    # get documents that contain both terms
    common_docs = set(positional_index.get(term1, {}).keys()).intersection(set(positional_index.get(term2, {}).keys()))
    for doc_id in common_docs:
        positions1 = positional_index[term1][doc_id]
        positions2 = positional_index[term2][doc_id]
        # check for pairs of positions that satisfy the proximity condition
        for pos1 in positions1:
            for pos2 in positions2:
                if abs(pos1 - pos2) <= k + 1:  # k words apart means distance <= k+1
                    result.add(doc_id)
                    break
    return result

# Giving most importance to

> AND,

>  then OR

> and then NOT

In [None]:
def evaluate_query(query, positional_index, all_doc_ids):
    query = query.lower()
    # applying stemming to the query
    stemmed_query = " ".join([stemmer.stem(term) for term in query.split()])

    # do AND operations first
    if " and " in stemmed_query:
        and_parts = stemmed_query.split(" and ")
        and_results = []
        for part in and_parts:
            and_results.append(evaluate_query(part.strip(), positional_index, all_doc_ids))
        result = set.intersection(*map(set, and_results)) # intersect all results
        return result

    # do OR operations
    elif " or " in stemmed_query:
        or_parts = stemmed_query.split(" or ")
        or_results = []
        for part in or_parts:
            or_results.append(evaluate_query(part.strip(), positional_index, all_doc_ids))
        result = set.union(*map(set, or_results)) # join all results
        return result

    # do NOT operations
    elif " not " in stemmed_query:
        terms = stemmed_query.split(" not ")
        return process_not_query(terms[1].strip(), positional_index, all_doc_ids)

    # proximity queries
    elif "/" in stemmed_query:
        parts = stemmed_query.split("/")
        k = int(parts[1].strip())
        terms = parts[0].split()
        if len(terms) == 2:
            return process_proximity_query(terms[0].strip(), terms[1].strip(), k, positional_index)

    # single term query
    else:
        return set(positional_index.get(stemmed_query, {}).keys())

In [None]:
import os
import chardet

def load_abstracts(directory):
    abstracts = {}
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)

        # Detect the file encoding
        with open(filepath, 'rb') as file:
            raw_data = file.read()
            encoding = chardet.detect(raw_data)['encoding']

        # Read the file using the detected encoding
        try:
            with open(filepath, 'r', encoding=encoding) as file:
                doc_id = int(filename.split('.')[0])
                abstracts[doc_id] = file.read()
        except Exception as e:
            print(f"Error reading file {filename}: {e}")

    return abstracts

In [None]:
# Load Stopwords
def load_stopwords(stopword_file):
    with open(stopword_file, 'r', encoding='utf-8') as file:
        stopwords = set(file.read().splitlines())
    return stopwords

# Running a Simple GUI

In [None]:
class BooleanRetrievalGUI:
    def __init__(self, positional_index, all_doc_ids):
        self.positional_index = positional_index
        self.all_doc_ids = all_doc_ids

        # main window
        self.root = tk.Tk()
        self.root.title("Boolean Retrieval Model")
        self.root.geometry("900x400")#initial window size
        self.root.minsize(600, 400)#minimum window size
        self.root.columnconfigure(0, weight=1)
        self.root.rowconfigure(1, weight=1)

        # style
        self.style = ttk.Style()
        self.style.configure("TLabel", font=("Arial", 12))
        self.style.configure("TButton", font=("Arial", 12))
        self.style.configure("TEntry", font=("Arial", 12))

        # query input field
        self.query_label = ttk.Label(self.root, text="Enter your query:")
        self.query_label.grid(row=0, column=0, padx=10, pady=10, sticky="w")
        self.query_entry = ttk.Entry(self.root, width=50)
        self.query_entry.grid(row=0, column=1, padx=10, pady=10, sticky="ew")
        self.query_entry.bind("<Return>", lambda event: self.process_query())  # using enter key to submit query

        # buttons
        self.button_frame = ttk.Frame(self.root)
        self.button_frame.grid(row=0, column=2, padx=10, pady=10, sticky="e")
        self.submit_button = ttk.Button(self.button_frame, text="Submit", command=self.process_query)
        self.submit_button.pack(side="left", padx=5)
        self.clear_button = ttk.Button(self.button_frame, text="Clear", command=self.clear_results)
        self.clear_button.pack(side="left", padx=5)

        # results display
        self.result_label = ttk.Label(self.root, text="Result Set:")
        self.result_label.grid(row=1, column=0, padx=10, pady=10, sticky="nw")
        self.result_text = scrolledtext.ScrolledText(self.root, wrap=tk.WORD, font=("Arial", 12))
        self.result_text.grid(row=1, column=1, columnspan=2, padx=10, pady=10, sticky="nsew")

        # status bar
        self.status_var = tk.StringVar()
        self.status_var.set("Ready")
        self.status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
        self.status_bar.grid(row=2, column=0, columnspan=3, sticky="ew")

    def process_query(self):
        query = self.query_entry.get()
        if not query:
            messagebox.showwarning("Input Error", "Please enter a query.")
            return

        self.status_var.set("Processing query...")
        self.root.update_idletasks()  # update the status bar immediately

        result = evaluate_query(query, self.positional_index, self.all_doc_ids)
        self.result_text.delete(1.0, tk.END)
        self.result_text.insert(tk.END, f"Result-Set: {sorted(result)}")

        self.status_var.set("Query processed successfully")

    def clear_results(self):
        self.query_entry.delete(0, tk.END)
        self.result_text.delete(1.0, tk.END)
        self.status_var.set("Ready")

    def run(self):
        self.root.mainloop()

In [None]:
def main():
    abstracts_dir = "Abstracts"
    stopword_file = "Stopword-List.txt"
    abstracts = load_abstracts(abstracts_dir)
    stopwords = load_stopwords(stopword_file)

    # check if indexes already exist on disk
    index_file = "indexes.pkl"
    if os.path.exists(index_file):
        print("Loading indexes from disk...")
        inverted_index, positional_index = load_indexes(index_file)
    else:
        print("Building indexes...")
        inverted_index = build_inverted_index(abstracts, stopwords)
        positional_index = build_positional_index(inverted_index)
        save_indexes(inverted_index, positional_index, index_file)

    all_doc_ids = set(abstracts.keys())

    gui = BooleanRetrievalGUI(positional_index, all_doc_ids)
    gui.run()

if __name__ == "__main__":
    main()

Loading indexes from disk...
