In [2]:
pip install nltk

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!cd drive/MyDrive/Colab\ Notebooks
# set path to /content/.... as absolute path instead?

/bin/bash: line 0: cd: drive/MyDrive/Colab Notebooks: No such file or directory


In [5]:
!ls

drive  sample_data


In [6]:
# All imports
import os
import re
import string
import time
from collections import defaultdict

import pandas as pd
import nltk
from nltk.corpus import stopwords, wordnet
from nltk.tokenize import TweetTokenizer, MWETokenizer
from nltk.stem import WordNetLemmatizer

# for use in removing stop words
nltk.download('stopwords')

# required for pos tagging
nltk.download('averaged_perceptron_tagger')

# required for lemmatization
nltk.download('wordnet')
# required for wordnet
nltk.download('omw-1.4')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...


True

In [9]:
class InvertedIndex:
    """
    Construct Inverted Index
    """
    def __init__(self):
        """
        Initialize any instance variables that have to be set
        """
        # Window size for Task 2, initial window size is 2 since we will be only searching with two terms
        self.window_size = 2
        # Using TweetTokenizer as initial tokenizer
        # See report for justification
        self.unigram_tokenizer = TweetTokenizer()
        # Declaring mwe_tokenizer as instance var - is initialized later
        self.mwe_tokenizer = None
        # Initializing file map as empty dict
        self.file_map = {}
        # Initialize distionary that stores inverted index
        self.inv_index = {}
        # Set of stop words downloaded from nltk
        self.stop_words = set(stopwords.words("english"))
        # Initializing WordNetLemmatizer
        self.wordnet_lemmatizer = WordNetLemmatizer()

    def pos_tagger(self, nltk_tag):
        """
        Take a POS tag, and return a wordnet equivalent tag for use in lemmatization
        """
        if nltk_tag.startswith("J"):
            return wordnet.ADJ
        elif nltk_tag.startswith("V"):
            return wordnet.VERB
        elif nltk_tag.startswith("N"):
            return wordnet.NOUN
        elif nltk_tag.startswith("R"):
            return wordnet.ADV
        else:
            return None


    def read_data(self, path: str) -> list:
        """
        Read files from a directory and then append the data of each file into a list.
        """
        # If path doesn't end with forwardslash, add it
        if path[-1] != "/":
            path = path + "/"

        # Look through all the filenames, get any with .csv in name
        files_list = sorted(
            [filename for filename in os.listdir(path) if ".csv" in filename]
        )

        # Initialize a list to store all terms from the csv files
        special_tokens = []
        # Iterate through each .csv file
        for filename in files_list:
            # read the contents of .csv file using the pandas library
            names = pd.read_csv(path + filename)
            # Take column 'name' and convert to list, append to apecial_tokens
            names = names["name"].to_list()
            special_tokens = [*special_tokens, *names]

        # Convert all special tokens into lowercase for case insensitivity
        special_tokens = [token.lower() for token in special_tokens]
        # Initialize the multi-word tokenizer with terms extracted from csv files
        self.mwe_tokenizer = MWETokenizer([tuple(word.split(" ")) for word in special_tokens])


        # Look through all the filenames, get any with .txt in name
        files_list = sorted(
            [filename for filename in os.listdir(path) if ".txt" in filename]
        )

        # Initialize a list to store lists of tokens extracted from the txt files
        tokens_list = []
        # file id starts from 0
        file_id = 0
        # Iterate through each .txt file
        for filename in files_list:
            # Extract the contents of the file as string
            with open(path + filename, "r", encoding="utf8") as file:
                raw_text = file.read()
            file.close()
            # Map each file id with respective file name
            self.file_map[file_id] = filename
            # Convert raw text into all lowercase first
            # then process_document with the extracted raw text
            tokens = self.process_document(raw_text.lower())
            # Take the processed tokens and append it to the list of tokens
            tokens_list.append(tokens)
            file_id += 1

        # Return the full set of tokens generated from the text files
        return tokens_list
        
    def process_document(self, document: str) -> list:
        """
        pre-process a document and return a list of its terms
        str->list"""

        # Regex to remove any square brackets and any content inside them
        # None of the square brackets contain meaningful words in our corpus
        raw_text = re.sub("\[.*?\]", "", document)

        # Removing formatting text that appears in nearly every article
        raw_text = raw_text.replace("From Wikipedia, the free encyclopedia", "")
        raw_text = raw_text.replace("Jump to navigationJump to search", "")
        raw_text = raw_text.replace("List of episodes", "")
        raw_text = raw_text.replace("Plot", "")
        raw_text = raw_text.replace("Production", "")
        raw_text = raw_text.replace("Reception", "")
        raw_text = raw_text.replace("References", "")
        raw_text = raw_text.replace("External links", "")

        # Make use of TweetTokenizer to split raw text
        tokens_with_punctuations = self.unigram_tokenizer.tokenize(raw_text)
        
        # Removing punctucations and unnecessary symbols
        # punctuations are lifted from string.punctuations
        # Exceptions are ' # - and _
        punctuations = '\t←→!"$%&()*+, ./:;<=>?@[\]^`{|}~'

        # We make use of a trans table to remove specified symbols
        # By providing 3 arguments, we remove all occurrances of 
        # any characters present in the 3rd argument
        trans_table = str.maketrans("", "", punctuations)
        stripped_words = [word.translate(trans_table) for word in tokens_with_punctuations]
        
        # We iterate through each token in stripped_words
        # and remove any leftover empty strings
        tokens_without_punctuations = [str for str in stripped_words if str]


        ##### POS TAGGING
        pos_tagged = nltk.pos_tag(tokens_without_punctuations)
        wordnet_tagged = list(map(lambda x: (x[0], self.pos_tagger(x[1])), pos_tagged))

        lemmatized = [
            word if tag is None else self.wordnet_lemmatizer.lemmatize(word, tag)
            for word, tag in wordnet_tagged
        ]
        #####

        # Converting any occurrances of multi-word phrases from .csv files
        # into singular, multi-word tokens
        multi_tokens = self.mwe_tokenizer.tokenize(tokens_without_punctuations)
        # Replacing '_' with whitespace for better presentation on output
        multi_tokens = [word.replace("_", " ") for word in multi_tokens]
        # Removing stopwords on multi-word tokens
        # We replace all stop words with placeholders to keep the word position consistent
        multi_tokens_without_sw = [word if not word in self.stop_words else "<place_holder>" for word in multi_tokens]
        # Doing the same for unigram tokens
        uni_tokens_without_sw = [word if not word in self.stop_words else "<place_holder>" for word in lemmatized]

        # Return a list containing two lists of tokens
        # One containing unigrams, one also containing multi-word tokens
        token_tuple = [uni_tokens_without_sw, multi_tokens_without_sw]
        return token_tuple
    
    def index_corpus(self, documents: list) -> None:
        """
        index given documents
        list->None"""

        # Iterate through each set of tokens extracted from documents
        #inv_index[term][0]-> incidences of documents of the term / inv_index[term][1]-> file id , postions appended
        for file_id, token_tuple in enumerate(documents):
            # Iterate through each unigram tokens(token_tuple[0])
            for position, term in enumerate(token_tuple[0]):
                # Skip any occurrances of placeholder tokens
                if term != "<place_holder>":
                    # If term has occurred before
                    if term in self.inv_index:
                        # If term has occurred before in current document
                        if self.file_map[file_id] in self.inv_index[term][1]:
                            # Add document position to list
                            self.inv_index[term][1][self.file_map[file_id]].append(position)
                        # If this is first occurrance of the term in current document
                        else:
                            # Increment document freq by 1
                            self.inv_index[term][0] += 1
                            # Initialize list of positions in document with current position
                            self.inv_index[term][1][self.file_map[file_id]] = [position]
                    # If this is first occurrance of the term in inverted index
                    else:
                        # Initialize list with term as key
                        # item 0 is 1 since this is the first occurrance
                        # so document frequency is 1
                        self.inv_index[term] = [1, {}]
                        # Add current occurance
                        self.inv_index[term][1][self.file_map[file_id]] = [position]

            # for keeping track of cumulative offset: Without this, the postion ofupcoming multi-word tokens is not correct due to the length of the multi-word
            # see report for detail
            c_offset = 0
            # Iterate through each tokens in list of tokens with multi-word tokens(token_tuple[1])
            for position, term in enumerate(token_tuple[1]):
                # Skip any occurrances of single-word tokens
                if len(term.split(" ")) > 1:
                    # If term has occurred before
                    if term in self.inv_index:
                        # If term has occurred before in current document
                        if self.file_map[file_id] in self.inv_index[term][1]:
                            # Add document position to list
                            self.inv_index[term][1][self.file_map[file_id]].append(position + c_offset)
                        # If this is first occurrance of the term in current document
                        else:
                            # Increment document freq by 1
                            self.inv_index[term][0] += 1
                            # Initialize list of positions in document with current position
                            self.inv_index[term][1][self.file_map[file_id]] = [position + c_offset]
                    # If this is first occurrance of the term in inverted index
                    else:
                        # Initialize list with term as key
                        # item 0 is 1 since this is the first occurrance
                        # so document frequency is 1
                        self.inv_index[term] = [1, {}]
                        # Add current occurance
                        self.inv_index[term][1][self.file_map[file_id]] = [position + c_offset]
                    # increment cumulative offset
                    c_offset += len(term.split(" ")) - 1

     
    def dump(self, path: str) -> None:
        """
        provide a dump function to show index entries for a given set of terms        
        """
        # Open the file with development examples
        with open(path, "r", encoding="utf8") as file:
            _text = file.read()
        file.close()

        _text = _text.lower().split("\n")
        pos_tagged = nltk.pos_tag(_text)
        print(pos_tagged)
        wordnet_tagged = list(map(lambda x: (x[0], self.pos_tagger(x[1])), pos_tagged))
        print(wordnet_tagged)
        _text_lemmatized = [
            self.wordnet_lemmatizer.lemmatize(word, tag) if len(word.split(" ")) < 2 else word for word, tag in wordnet_tagged
        ]

        for index, term in enumerate(_text_lemmatized):
            print("Input: " + _text[index] + ", Lemmatized: " + term)
            if term in self.inv_index:
                print(self.inv_index[term])
            else:
                print("Term not present in index")

       
    def proximity_search(self, term1: str, term2: str, window_size: int) -> dict:
        """
        This is Task 2"""
        #returning search result
        search_results = {}

        # Setting a list of given terms for searching
        search_terms = [term1, term2]
        
        # lemmmatize the search terms more details are given in the report
        lem_search_terms = [self.wordnet_lemmatizer.lemmatize(search_terms[0]),
                            self.wordnet_lemmatizer.lemmatize(search_terms[1])]

        # setting indices in inverted index for term1 and term2
        term1_indices = self.inv_index[lem_search_terms[0]]
        term2_indices = self.inv_index[lem_search_terms[1]]

        # setting window size
        self.window_size = window_size

        # Store and list the files that has both terms occuring
        comm_docs = [
            key
            for key in term1_indices[1]
            if key in term2_indices[1]
        ]

        for doc in comm_docs:
          # setting indices of common document containing term1 and term2
          t1_indices_in_doc = term1_indices[1][doc]
          t2_indices_in_doc = term2_indices[1][doc]

          # print(t1_indices_in_doc)
          # print(t2_indices_in_doc)
          for t1_index in t1_indices_in_doc:
            for t2_index in t2_indices_in_doc:
              # Get indices when absolute proximity of the two terms is smaller than the given window_size
              if abs(t2_index - t1_index) <= self.window_size - 1:
                if doc not in search_results:
                  search_results[doc] = [1,[(t1_index, t2_index)]]
                
                else:
                  search_results[doc][0] += 1
                  search_results[doc][1].append((t1_index,t2_index))
        

        return search_results

    

In [11]:
def main():
    "main call function"
    start = time.time() # Checking the time before initializing the inverted index

    index = InvertedIndex() # initilaise the index
    corpus = index.read_data('drive/MyDrive/Simpsons-2022') # specify the directory path in which files are located
    index.index_corpus(corpus) # index documents/corpus

    end = time.time() # Checking the time after inverted index is created

    # ## testing the dump function sith the development-examples.txt
    index.dump('drive/MyDrive/ColabNotebooks/development-examples.txt')

    # ## checking inverted index
    # print(index.inv_index)


    # ## print multi-words of inverted index
    # print("\n")
    # has_space = False
    # for key in index.inv_index.keys():
    #   for c in key:
    #     if c.isspace():
    #       has_space = True
    #     else:
    #       continue
    #   if has_space == True:
    #     print(key)
    #     has_space = False
    #   else:
    #     continue

    # ## checking if the keys of inverted index have $%#
    # for key in index.inv_index:
    #     if('$' in key or '%' in key or '#' in key):
    #         print(key, index.inv_index[key])

    # ## printing length of the inverted index(number of vocabs)
    print(len(index.inv_index))

    # ## testing proximity search
    # print(index.proximity_search('rock','stars',3))
    # print(index.proximity_search('rock','station',3))

    # ## printing time taken to construct inverted index
    print("EXECUTION TIME: {}".format(end - start) + " sec")
    return index
index = main()

[('bart', 'NN'), ('first', 'RB'), ('image', 'NN'), ('montage', 'NN'), ('well', 'RB'), ('top', 'JJ'), ('arguably', 'RB'), ('best', 'JJS'), ('number', 'NN'), ('humor', 'NN'), ('dollarydoos', 'JJ'), ('bart simpson', 'NN'), ('gordie howe', 'NN'), ('recalled', 'VBD'), ('bart the lover', 'NN'), ('cents', 'NNS'), ('won', 'VBD'), ('voice-overs', 'NNS'), ('simpsonovi', 'NN')]
[('bart', 'n'), ('first', 'r'), ('image', 'n'), ('montage', 'n'), ('well', 'r'), ('top', 'a'), ('arguably', 'r'), ('best', 'a'), ('number', 'n'), ('humor', 'n'), ('dollarydoos', 'a'), ('bart simpson', 'n'), ('gordie howe', 'n'), ('recalled', 'v'), ('bart the lover', 'n'), ('cents', 'n'), ('won', 'v'), ('voice-overs', 'n'), ('simpsonovi', 'n')]
Input: bart, Lemmatized: bart
[114, {'3.1.txt': [201, 469, 519, 619, 656, 662, 687, 708, 959, 980, 1416, 1421, 1812, 1849, 2034, 2947, 3001], '3.10.txt': [1623], '3.11.txt': [683, 1173, 1222], '3.12.txt': [144, 209, 371, 451, 796, 800, 837, 944, 1129], '3.13.txt': [1, 14, 98, 133, 18