In [1]:
import os
import random
import lxml
import nltk
from bs4 import BeautifulSoup
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from string import punctuation
from typing import List, Dict, Tuple, Union
import string
# nltk.download('punkt')
# nltk.download('stopwords')

ORIGINAL_PATH = os.path.join(os.getcwd(), os.pardir, os.pardir, 'Dataset', 'CSE508_Winter2023_Dataset', 'CSE508_Winter2023_Dataset')
ALTERED_PATH = os.path.join(os.getcwd(), 'DatasetAlter')


In [None]:
def create_alter(seed=1):

	random.seed(seed)
	random_samples = random.sample(range(1, 1401), 5)

	if not os.path.exists(ALTERED_PATH):
		os.makedirs(ALTERED_PATH)

	for filename in os.listdir(ORIGINAL_PATH):

		with open(os.path.join(ORIGINAL_PATH, filename), 'r') as f:
			original = f.read()

		soup = BeautifulSoup(original, 'lxml')
		content = soup.title.string.strip() + " " + soup.find('text').text.strip()

		if int(filename[-4:]) in random_samples:
			print("----------------------------------")
			print("Filename: ", filename)
			print("----------------------------------")
			print("Before: ")
			print(original)
			print("----------------------------------")
			print("After: ")
			print(content)
			print("----------------------------------")

		with open(os.path.join(ALTERED_PATH, filename), 'w') as fa:
			fa.write(content)

	print("Finished Processing")

In [None]:
def clear_alter():
	for filename in os.listdir(ALTERED_PATH):
		os.remove(os.path.join(ALTERED_PATH, filename))


In [None]:
clear_alter()
create_alter()

In [None]:
def preprocess(seed=1):
	random.seed(seed)
	random_samples = random.sample(range(1, 1401), 5)

	for filename in os.listdir(ALTERED_PATH):
		with open(os.path.join(ALTERED_PATH, filename), 'r') as f:
			original = f.read()

		content = original.lower()


		if int(filename[-4:]) in random_samples:
			print("----------------------------------")
			print("LOWERCASE")
			print("----------------------------------")
			print("Filename: ", filename)
			print("----------------------------------")
			print("Before: ")
			print(original)
			print("----------------------------------")
			print("After: ")
			print(content)
			print("----------------------------------")

		content = word_tokenize(content)


		if int(filename[-4:]) in random_samples:
			print("----------------------------------")
			print("TOKENIZE")
			print("After: ")
			print(content)
			print("----------------------------------")

		content = [w for w in content if not w in stopwords.words('english')]


		if int(filename[-4:]) in random_samples:
			print("----------------------------------")
			print("STOPWORDS")
			print("After: ")
			print(content)
			print("----------------------------------")

		content = [w for w in content if not w in punctuation]


		if int(filename[-4:]) in random_samples:
			print("----------------------------------")
			print("PUNCTUATION")
			print("After: ")
			print(content)
			print("----------------------------------")

		content = [w for w in content if w.strip()]


		if int(filename[-4:]) in random_samples:
			print("----------------------------------")
			print("BLANKSPACE")
			print("After: ")
			print(content)
			print("----------------------------------")

		content = " ".join(content)

		with open(os.path.join(ALTERED_PATH, filename), 'w') as fa:
			fa.write(content)

	print("Finished Processing")


In [None]:
preprocess()

In [None]:
class BooleanQueries:
    def __init__(self, path: str) -> None:
        """
        Initialize the BooleanQueries object.
        :param path: The path to the collection of documents.
        """

        self.docs: int = 0
        self.index: Dict[str, Tuple[int, List[str]]] = {}
        self.PATH: str = path
        self.buildIndex()


    def buildIndex(self) -> None:
        """
        Build the inverted index by processing each document in the collection.
        Process includes lower-casing, tokenizing, removing stopwords, punctuations and blank spaces.
        """

        for filename in os.listdir(self.PATH):
            self.docs += 1

            with open(os.path.join(self.PATH, filename), 'r') as f:
                content = f.read()

            # Tokenize and lower case
            tokens = word_tokenize(content.lower())
            # Remove stopwords
            tokens = [token for token in tokens if token not in stopwords.words("english")]
            # Remove punctuation
            tokens = [token.translate(str.maketrans("", "", string.punctuation)) for token in tokens]
            # Remove blank spaces
            tokens = [token for token in tokens if token.strip()]

            # Add to index
            for token in tokens:
                if token not in self.index:
                    self.index[token] = (0, [filename])
                else:
                    self.index[token][1].append(filename)

            # Sort unique postings
            for token in self.index:
                self.index[token] = (len(self.index[token][1]), sorted(list(set(self.index[token][1]))))

        print("Finished Building Index")


    def getPostingList(self, term: str) -> Tuple[int, List[str]]:
        """
        Get the posting list for a term.
        :param term: The term to get the posting list for.
        :return: The posting list for the term as well as the frequency of the term in the collection.
        """

        return self.index[term] if term in self.index else (0, [])


    def getTotalDocs(self) -> int:
        """
        Get the total number of documents in the collection.
        :return: The total number of documents in the collection.
        """

        return self.docs


    def queryAND(self, term1: Union[str, List[str]], term2: Union[str, List[str]]) -> Tuple[List[str], int]:
        """
        Perform a boolean AND query on the inverted index.
        :param term1: The first term to query or a posting list.
        :param term2: The second term to query or a posting list.
        :return: A list of document names that contain both terms and the number of comparisons made.
        """

        # Check if terms are in inverted index
        if not isinstance(term1, list) and term1 not in self.index:
            return [], 0
        if not isinstance(term2, list) and term2 not in self.index:
            return [], 0
        if isinstance(term1, list) and len(term1) == 0:
            return [], 0
        if isinstance(term2, list) and len(term2) == 0:
            return [], 0

        # Get posting lists
        postings1 = self.index[term1][1] if isinstance(term1, str) else term1
        postings2 = self.index[term2][1] if isinstance(term2, str) else term2

        # Perform AND
        comparisons = 0
        result = []
        i = 0
        j = 0
        while i < len(postings1) and j < len(postings2):
            comparisons += 1
            if postings1[i] == postings2[j]:
                result.append(postings1[i])
                i += 1
                j += 1
            elif postings1[i] < postings2[j]:
                i += 1
            else:
                j += 1

        return result, comparisons

    def queryOR(self, term1: Union[str, List[str]], term2: Union[str, List[str]]) -> Tuple[List[str], int]:
        """
        Perform a boolean OR query on the inverted index.
        :param term1: The first term to query or a posting list.
        :param term2: The second term to query or a posting list.
        :return: A list of document names that contain both terms and the number of comparisons made.
        """

        # Posting Lists
        postings1 = []
        postings2 = []

        # Check if terms are in inverted index
        if not isinstance(term1, list) and term1 not in self.index:
            postings1.append("1")
        if not isinstance(term2, list) and term2 not in self.index:
            postings2.append("1")
        if isinstance(term1, list) and len(term1) == 0:
            postings1.append("1")
        if isinstance(term2, list) and len(term2) == 0:
            postings2.append("1")

        # No merge if one of the lists is empty
        if postings1 and not postings2:
            return self.index[term2][1] if isinstance(term2, str) else term2, 0
        elif postings2 and not postings1:
            return self.index[term1][1] if isinstance(term1, str) else term1, 0
        elif postings1 and postings2:
            return [], 0

        # Get posting lists
        postings1 = self.index[term1][1] if isinstance(term1, str) else term1
        postings2 = self.index[term2][1] if isinstance(term2, str) else term2

        # Perform OR
        comparisons = 0
        result = []
        i = 0
        j = 0
        while i < len(postings1) and j < len(postings2):
            comparisons += 1
            if postings1[i] == postings2[j]:
                result.append(postings1[i])
                i += 1
                j += 1
            elif postings1[i] < postings2[j]:
                result.append(postings1[i])
                i += 1
            else:
                result.append(postings2[j])
                j += 1

        while i < len(postings1):
            result.append(postings1[i])
            i += 1

        while j < len(postings2):
            result.append(postings2[j])
            j += 1

        return result, comparisons


    def queryNOT(self, term: Union[str, List[str]]) -> Tuple[List[str], int]:
        """
        Perform a boolean OR query on the inverted index.
        :param term: The term to query or a posting list.
        :return: A list of document names that do not contain the term and the number of comparisons made.
        """

        # Get all postings
        all_postings = [filename for filename in os.listdir(self.PATH)]

        # Check if term is in inverted index
        if not isinstance(term, list) and term not in self.index:
            return all_postings, 0
        if isinstance(term, list) and len(term) == 0:
            return all_postings, 0

        # Get posting lists
        postings = self.index[term][1] if isinstance(term, str) else term


        # Perform NOT
        comparisons = 0
        result = []
        i = 0
        j = 0
        while i < len(postings) and j < len(all_postings):
            comparisons += 1
            if postings[i] == all_postings[j]:
                i += 1
                j += 1
            elif postings[i] < all_postings[j]:
                i += 1
            else:
                result.append(all_postings[j])
                j += 1

        while j < len(all_postings):
            result.append(all_postings[j])
            j += 1

        return result, comparisons


    def queryANDNOT(self, term1: Union[str, List[str]], term2: Union[str, List[str]]) -> Tuple[List[str], int]:
        """
        Perform a boolean AND NOT query on the inverted index.
        :param term1: The first term to query or a posting list.
        :param term2: The second term to query or a posting list.
        :return: A list of document names that contain term1 but not term2 and the number of comparisons made.
        """

        # Check if terms are in inverted index
        if not isinstance(term1, list) and term1 not in self.index:
            return [], 0
        if isinstance(term1, list) and len(term1) == 0:
            return [], 0
        if not isinstance(term2, list) and term2 not in self.index:
            return self.index[term1][1] if isinstance(term1, str) else term1, 0
        if isinstance(term2, list) and len(term2) == 0:
            return self.index[term1][1] if isinstance(term1, str) else term1, 0

        # Get posting lists
        postings1 = self.index[term1][1] if isinstance(term1, str) else term1
        postings2 = self.index[term2][1] if isinstance(term2, str) else term2

        # Perform AND NOT
        comparisons = 0
        result = []
        i = 0
        j = 0
        while i < len(postings1) and j < len(postings2):
            comparisons += 1
            if postings1[i] == postings2[j]:
                i += 1
                j += 1
            elif postings1[i] < postings2[j]:
                result.append(postings1[i])
                i += 1
            else:
                j += 1

        while i < len(postings1):
            result.append(postings1[i])
            i += 1

        return result, comparisons


    def queryProcess(self, query: str) -> Tuple[List[str], int]:
        """
        Parse a query and perform the appropriate boolean query. Queries are performed in the order of NOT, AND, OR.
        :param query: The query to parse, should be of the form "A AND B OR NOT C AND NOT D" or a similar combination.
        :return: List of document names that match the query and the number of comparisons made.
        """

        result = []
        comparisons = 0

        # Split query into OR subqueries
        or_subqueries = query.split(" OR ")

        # Process each OR subquery
        for subquery in or_subqueries:

            # Tokenize subquery
            subquery = [token for token in subquery.split(" ") if token]

            # If length is 1, simply perform OR query on the term
            if len(subquery) == 1:
                if not result:
                    subresult = self.index[subquery[0]][1] if subquery[0] in self.index else []
                    subcomparisons = 0
                else:
                    subresult, subcomparisons = self.queryOR(result, subquery[0])

                result = subresult
                comparisons += subcomparisons
                continue

            # Perform any NOT query at the start
            elif subquery[0] == "NOT":
                subresult, subcomparisons = self.queryNOT(subquery[1])
                comparisons += subcomparisons
                subquery = subquery[2:]

                if not result:
                    result = subresult
                else:
                    subresult, subcomparisons = self.queryOR(result, subresult)
                    result = subresult
                    comparisons += subcomparisons

            # Parse rest of the query left to right
            subresult = []
            while subquery:

                # Append term 1 to result
                if subquery[0] != "AND" and not subresult:
                    subresult = self.index[subquery[0]][1] if subquery[0] in self.index else []
                    subquery = subquery[1:]
                    continue

                # Perform AND/AND NOT as required
                if subquery[0] == "AND" and subquery[1] == "NOT":
                    subresult, subcomparisons = self.queryANDNOT(subresult, subquery[2])
                    comparisons += subcomparisons
                    subquery = subquery[3:]

                elif subquery[0] == "AND":
                    subresult, subcomparisons = self.queryAND(subresult, subquery[1])
                    comparisons += subcomparisons
                    subquery = subquery[2:]

            if not result:
                result = subresult
            else:
                subresult, subcomparisons = self.queryOR(result, subresult)
                result = subresult
                comparisons += subcomparisons

        return result, comparisons

In [None]:
print("--- Boolean Queries ---")
print("Enter the full query: ")
print()

n = int(input("Enter the number of queries: "))
bq = BooleanQueries(os.path.join(os.getcwd(), 'DS2'))

for i1 in range(n):
    print("Enter the search term: ")
    search_term = input()
    print("Enter operations: ")
    operations = input()


    search_term = word_tokenize(search_term.lower())
    search_term = [s for s in search_term if s not in stopwords.words("english")]
    search_term = [s.translate(str.maketrans("", "", string.punctuation)) for s in search_term]
    search_term = [s for s in search_term if s.strip()]

    if operations:
        operations = [s.strip() for s in operations.split(",")]
        operations = [s for s in operations if s in ["AND", "OR", "OR NOT", "AND NOT"]]

    if len(search_term) == 0 or len(operations) != len(search_term) - 1:
        print("Invalid query")
        continue

    query1 = ""
    for j1 in range(len(search_term)):
        query1 += search_term[j1]
        if j1 < len(operations):
            query1 += " " + operations[j1] + " "

    print("Query: ", query1)

    # query = query.replace("AND NOT", "ANDNOT")

    result1, comparisons1 = bq.queryProcess(query1)
    print("Result: ", result1)
    print("Frequency: ", len(result1))
    print("Comparisons: ", comparisons1)
    print()


