<a href="https://colab.research.google.com/github/JonasVerbickas/test-jupyter/blob/main/NLP_CW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import nltk
import re
import string
import unicodedata
import time
import pandas as pd
from matplotlib import pyplot as plt
from collections import Counter
from collections import UserDict
from tqdm import tqdm

I'm not sure how you will access the corpus on your side.
Personally, I've uploaded it to my Google drive.

In [2]:
from google.colab import drive
drive.mount('/content/drive')

MessageError: ignored

In [13]:
porter = nltk.PorterStemmer()
porter.stem("home")

'home'

# Download additional NLTK resources
These are required for tokenization and stop-word removal

In [None]:
nltk.download('punkt')
nltk.download('stopwords')
# I convert it into a set, because it results in faster `in` look-ups
STOP_WORDS = nltk.corpus.stopwords.words('english')
# convert to set() for faster `in` checks
STOP_WORDS = set(STOP_WORDS)
# for some reason would is missing by default
STOP_WORDS.add('would')
# t an `episode` most common term is the possesive `'s`
# i'd argue that it should be removed, because multi-word terms that contain possesives
# should be findable without `'s` e.g. "moe tavern" should match with "Moe's tavern", "bart treehouse" should match "Bart's treehouse"
# this will lead to some 
STOP_WORDS.add("'s")
# this is supposed to be used for tokenized 'won't' however it eliminates `won` as in winning as well
# I will expand all contractions and this stop-word will not be needed
STOP_WORDS.remove('won')
STOP_WORDS

# Implementation


## Initialize helper classes
I could've coded everything using tuples/lists instead of these helper classes.

However, I feel like these make the code more verbose and remove ambiguity that comes from giving positions in a tuple meaning.

In [None]:
class StringWithDocId:
  """
  Used in InvertedIndex.read_data method to create a list of document contents with doc_ids 
  """ 
  def __init__(self, string, doc_id):
    self.string = string
    self.doc_id = doc_id
  
  def __str__(self):
    return f"{self.string}: {self.doc_id}"

  def __repr__(self):
    return f"{self.string}: {self.doc_id}"

In [None]:
class StringWithDocIdAndPosition(StringWithDocId):
  """
  Stores positions as well.
  Used to create & sort list of positional terms.
  
  """
  def __init__(self, string, doc_id, position):
    super().__init__(string, doc_id)
    self.position = position
  
  def __lt__(token_with_doc_A, token_with_doc_B):
    if token_with_doc_A.string != token_with_doc_B.string:
      return token_with_doc_A.string < token_with_doc_B.string
    elif token_with_doc_A.doc_id != token_with_doc_B.doc_id:
      return token_with_doc_A.doc_id < token_with_doc_B.doc_id
    else:
      return token_with_doc_A.position < token_with_doc_B.position

In [None]:
class Posting(UserDict):
  """
  This stores the positional information for each term in the postional inverted index.
  Some methods have been overriden to its objects behave like dictionary.
  Keys are all of documents where this term appears;
  Values store the positions, where in that document term appears
  self.total_occurances stores the document_frequency
  {docID: [pos1, pos2, ...]}.
  """
  def __init__(self):
    super().__init__()
    self.posting_dic = {}
    self.total_occurances = 0
  
  def __contains__(self, doc_id):
    return doc_id in self.posting_dic
  
  def __iter__(self):
      return iter(self.posting_dic.items())

  def __getitem__(self, doc_id):
      return self.posting_dic[doc_id]
    
  def get(self, k, default=None):
    return self[k]
  
  def __len__(self):
      return len(self.posting_dic)

  def add(self, doc_id, position):
    if doc_id in self.posting_dic:
      self.posting_dic[doc_id].append(position)
    else:
      self.posting_dic[doc_id] = [position]
    self.total_occurances += 1
  
  def __str__(self):
    return f"{self.total_occurances} total occurances: {[f'{len(positions)} in {doc_id}: {positions}' for doc_id, positions in self.posting_dic.items()]}"
  
  def __repr__(self):
    return str(self)

In [None]:
# this map is from https://github.com/dipanjanS/practical-machine-learning-with-python/blob/master/bonus%20content/nlp%20proven%20approach/contractions.py
# I found it via https://towardsdatascience.com/a-practitioners-guide-to-natural-language-processing-part-i-processing-understanding-text-9f4abfd13e72
# removing contractions ensures more matches with stop-words and fixes the inconsitency of apostrohpes between stop-words and tokenizer
# this will remove apostrophes and solve the issue of inconsistency between tokenization and stop-words
def expandContractions(text):
  """
  This function will iterate through the whole list of contractions
  and replace all contracted forms in the given text with the full-length versions
  """
  CONTRACTION_MAP = {
  "ain't": "is not",
  "aren't": "are not",
  "can't": "cannot",
  "can't've": "cannot have",
  "'cause": "because",
  "could've": "could have",
  "couldn't": "could not",
  "couldn't've": "could not have",
  "didn't": "did not",
  "doesn't": "does not",
  "don't": "do not",
  "hadn't": "had not",
  "hadn't've": "had not have",
  "hasn't": "has not",
  "haven't": "have not",
  "he'd": "he would",
  "he'd've": "he would have",
  "he'll": "he will",
  "he'll've": "he he will have",
  "he's": "he is",
  "how'd": "how did",
  "how'd'y": "how do you",
  "how'll": "how will",
  "how's": "how is",
  "I'd": "I would",
  "I'd've": "I would have",
  "I'll": "I will",
  "I'll've": "I will have",
  "I'm": "I am",
  "I've": "I have",
  "i'd": "i would",
  "i'd've": "i would have",
  "i'll": "i will",
  "i'll've": "i will have",
  "i'm": "i am",
  "i've": "i have",
  "isn't": "is not",
  "it'd": "it would",
  "it'd've": "it would have",
  "it'll": "it will",
  "it'll've": "it will have",
  "it's": "it is",
  "let's": "let us",
  "ma'am": "madam",
  "mayn't": "may not",
  "might've": "might have",
  "mightn't": "might not",
  "mightn't've": "might not have",
  "must've": "must have",
  "mustn't": "must not",
  "mustn't've": "must not have",
  "needn't": "need not",
  "needn't've": "need not have",
  "o'clock": "of the clock",
  "oughtn't": "ought not",
  "oughtn't've": "ought not have",
  "shan't": "shall not",
  "sha'n't": "shall not",
  "shan't've": "shall not have",
  "she'd": "she would",
  "she'd've": "she would have",
  "she'll": "she will",
  "she'll've": "she will have",
  "she's": "she is",
  "should've": "should have",
  "shouldn't": "should not",
  "shouldn't've": "should not have",
  "so've": "so have",
  "so's": "so as",
  "that'd": "that would",
  "that'd've": "that would have",
  "that's": "that is",
  "there'd": "there would",
  "there'd've": "there would have",
  "there's": "there is",
  "they'd": "they would",
  "they'd've": "they would have",
  "they'll": "they will",
  "they'll've": "they will have",
  "they're": "they are",
  "they've": "they have",
  "to've": "to have",
  "wasn't": "was not",
  "we'd": "we would",
  "we'd've": "we would have",
  "we'll": "we will",
  "we'll've": "we will have",
  "we're": "we are",
  "we've": "we have",
  "weren't": "were not",
  "what'll": "what will",
  "what'll've": "what will have",
  "what're": "what are",
  "what's": "what is",
  "what've": "what have",
  "when's": "when is",
  "when've": "when have",
  "where'd": "where did",
  "where's": "where is",
  "where've": "where have",
  "who'll": "who will",
  "who'll've": "who will have",
  "who's": "who is",
  "who've": "who have",
  "why's": "why is",
  "why've": "why have",
  "will've": "will have",
  "won't": "will not",
  "won't've": "will not have",
  "would've": "would have",
  "wouldn't": "would not",
  "wouldn't've": "would not have",
  "y'all": "you all",
  "y'all'd": "you all would",
  "y'all'd've": "you all would have",
  "y'all're": "you all are",
  "y'all've": "you all have",
  "you'd": "you would",
  "you'd've": "you would have",
  "you'll": "you will",
  "you'll've": "you will have",
  "you're": "you are",
  "you've": "you have"
  }
  for k, v in CONTRACTION_MAP.items():
    text = text.replace(k, v)
  return text

In [None]:
def wikipediaPreprocessing(text):
  # stop-words and regex only contain lower case versions of words
  # therefore tokens must be lowercased in order to match them
  # casefold() is a more aggresive version of lower()
  # https://docs.python.org/3/library/stdtypes.html#str.casefold
  text = text.casefold()
  # reduce characters into ascii where meaning is preserved & seprate diacritics into individual characters
  text = unicodedata.normalize('NFKD', text)
  # remove all non ascii character i.e. separated diacritics
  text = text.encode('ASCII', 'ignore').decode('UTF-8')
  text = text.replace('[edit]', '')
  text = re.sub(r'\[\d+\]', '', text)
  # remove table of contents
  text = re.sub(r'contents\s+1\s+plot.+plot', '', text)
  # remove navigational hyperlinks
  text = re.sub(r'episode chronology(.|\n)+list of episodes', '', text)
  text = re.sub(r'jump to (navigation|search)', '', text)
  # remove other irrelevant wikipedia filler
  text = text.replace('from wikipedia, the free encyclopedia', '')
  text = text.replace('the simpsons episode', '')
  # expand contractions
  text =  expandContractions(text)
  return text

## Spelling correction!!
From P. Norvig's website: https://norvig.com/spell-correct.html.

In [None]:
class SpellCorrect:

  def __init__(self, posting_dict):
    self.total_num_of_words = 0
    self.word_freq_dic = {} 
    for key, val in posting_dict.items():
      self.total_num_of_words += val.total_occurances
      # I don't store positions here, because in case of spell check only frequencies are needed
      self.word_freq_dic[key] = self.total_num_of_words

  def _P(self, word): 
      "Probability of `word`."
      if word in self.word_freq_dic:
        return self.word_freq_dic[word] / self.total_num_of_words
      # If a word doesn't exist within the index
      # It's probality is 0 - this is a way to get rid of words that are not real
      else:
        return 0

  def correction(self, word): 
      "Most probable spelling correction for word."
      most_probable_correction = max(self._candidates(word), key=self._P)
      # if no correction is possible just return None
      if self._P(most_probable_correction) == 0:
        return None
      else:
        return most_probable_correction

  def _candidates(self, word): 
      "Generate possible spelling corrections for word."
      return (self._known([word]) or self._known(self._edits1(word)) or self._known(self._edits2(word)) or [word])

  def _known(self, words): 
      "The subset of `words` that appear in the dictionary of WORDS."
      return set(w for w in words if w in self.word_freq_dic.keys())

  def _edits1(self, word):
      "All edits that are one edit away from `word`."
      letters    = 'abcdefghijklmnopqrstuvwxyz'
      splits     = [(word[:i], word[i:])    for i in range(len(word) + 1)]
      deletes    = [L + R[1:]               for L, R in splits if R]
      transposes = [L + R[1] + R[0] + R[2:] for L, R in splits if len(R)>1]
      replaces   = [L + c + R[1:]           for L, R in splits if R for c in letters]
      inserts    = [L + c + R               for L, R in splits for c in letters]
      return set(deletes + transposes + replaces + inserts)

  def _edits2(self, word): 
      "All edits that are two edits away from `word`."
      return (e2 for e1 in self._edits1(word) for e2 in self._edits1(e1))

In [None]:
class MultiWordTermSet:
  """
  Used to store multi-word terms;
  when prompted finds all matching subsequences that match the stored terms
  Currently used for multi-token episode titles, character and location names.
  """

  def __init__(self):
    # this will store terms in different sets based on the number of tokens each term has
    # key [2] will have a set of processed terms of 2 tokens in length
    self.dict_of_sets = dict()

  def add(self, processed_tokens):
    # anything of size 1, isn't a multi-word term - don't waste space storing it
    if len(processed_tokens) > 1:
      if len(processed_tokens) not in self.dict_of_sets:
        # this is a set because there are some duplicates in the CSV files
        self.dict_of_sets[len(processed_tokens)] = set()
      self.dict_of_sets[len(processed_tokens)].add(" ".join(processed_tokens))

  def findMatchingSubsequences(self, list_of_processed_tokens: list) -> set:
    """
    Go through list_of_processed_tokens and try to find matching subsequences 
    Return all found matches
    """
    list_of_processed_tokens = list_of_processed_tokens[:max(self.dict_of_sets.keys())]
    matching_subseqs = set()
    for i in self.dict_of_sets.keys():
      i_joined_tokens = " ".join(list_of_processed_tokens[:i])
      if i_joined_tokens in self.dict_of_sets[i]:
        matching_subseqs.add(i_joined_tokens)
    return matching_subseqs


## The Inverted Index (THE MOST IMPORTANT BIT)

In [None]:
class InvertedIndex:
    """
    Construct Inverted Index
    """
    def __init__(self):
        self.inverted_index: dict = {}
        self.csv_terms = MultiWordTermSet() 
        self.ep_titles = MultiWordTermSet() 
  
    def _appendCSV(self, path: str) -> None:
      """
      Used to fill up csv_terms that are later used to index multi-word terms within the corpus
      1)opens a csv in the given path
      2)processes each term, so that it matches the tokenized corpus
      3)and appends to csv_terms set"""
      df = pd.read_csv(path)
      set_of_csv_entries = df['name']
      for term in set_of_csv_entries:
        # there is a bunch of numbered entries (~500) like "Relative #1" and "Relative #2"
        # these should be considered to be the same entry,
        # because characters aren't numbered like this in the corpus
        term = re.sub(r'#\d+', '', term)
        processed_tokens = self.processListOfTokens(self.process_document(term))
        self.csv_terms.add(processed_tokens)
    
    def _appendTitle(self, text):
      """This will extract the title (first line) from the given text, process it
      lastly append it to list of known titles"""
      unprocessed_ep_title = text.strip().split('\n')[0]
      processed_tokens = self.processListOfTokens(self.process_document(unprocessed_ep_title))
      self.ep_titles.add(processed_tokens)

    def read_data(self, path: str) -> list:
        """
        Read files from a directory and then append the data of each file into a list.
        """
        document_content_list = []
        # go through all of the files in the given directory
        print('read_data() has been called')
        for file in tqdm(os.listdir(path)):
          file_ext = os.path.splitext(file)[1].lower()
          filename_with_path = os.path.join(path, file)
          # if it's a txt file - append its contents to the output list
          if file_ext == '.txt':
            with open(filename_with_path, 'r') as f:
              text = f.read()
              document_content_list.append(StringWithDocId(text, file))
              # extract the title and use it as a multi-token indexing
              self._appendTitle(text)
          # if it's a csv file - assume it's a character/location list and use it for multi-token indexing
          elif file_ext == '.csv':
            self._appendCSV(filename_with_path)
          else:
            print(filename_with_path, " doesn't have the required file extension 'txt' or 'csv' - it will be skipped")
        print("Number of documents within the corpus:", len(document_content_list)) 
        return document_content_list
      
    def processListOfTokens(self, tokenized: list) -> list:
      """
      in a given list of tokens: removes stop-words, punctuation and stems the words.
      """
      output = []
      for token in tokenized:
        # ignore tokens that don't have words/numbers in them i.e. punctuation only
        if not re.search('\w', token):
          continue
        # delete hyphens
        token = token.replace('-', '')
        # ignore stop-words
        if token in STOP_WORDS:
          continue
        # apply stemming
        stemmed = porter.stem(token)
        output.append(stemmed)
      return output

    def process_document(self, document: str) -> list:
        """
        pre-process a document and return a list of its terms
        str->list"""
        # Wikipedia hyperlinks should be removed
        text = wikipediaPreprocessing(document)
        tokenized = nltk.tokenize.word_tokenize(text)
        # remove stop-words & apply stemming
        filtered_tokens = self.processListOfTokens(tokenized)
        return filtered_tokens

    def _findMultiWordTerms(self, processed_token_subseq: list):
      """
      This method tries to find all known multi-token subsections; starting from index [0] of the given processed_substring
      """        
      csv_subseqs = self.csv_terms.findMatchingSubsequences(processed_token_subseq)
      title_subseqs = self.ep_titles.findMatchingSubsequences(processed_token_subseq)
      return csv_subseqs.union(title_subseqs)
    
    def index_corpus(self, documents: list) -> None:
        """
        index given documents
        list->None"""
        starting_time = time.perf_counter()
        token_list = []

        # 1. Generate token list
        print("1. Generate token list")
        for doc in tqdm(documents):
          curr_doc_id = doc.doc_id
          tokenized_doc = self.process_document(doc.string)
          for i, token in enumerate(tokenized_doc):
            matching_subseqs = self._findMultiWordTerms(tokenized_doc[i:])
            # append all multi-word matches
            for match in matching_subseqs:
              match_with_doc_id_and_pos = StringWithDocIdAndPosition(match, curr_doc_id, i)
              token_list.append(match_with_doc_id_and_pos)
            # append current word
            token_with_doc_id_and_pos = StringWithDocIdAndPosition(token, curr_doc_id, i)
            token_list.append(token_with_doc_id_and_pos)
        # 2. Sort
        print("Sorting token list")
        sorted_token_list = sorted(token_list)
        # 3. Convert into dictionary of postings
        print("Creating a positional dictionary of postings")
        for i, token in enumerate(tqdm(sorted_token_list)):
          if token.string not in self.inverted_index:
            self.inverted_index[token.string] = Posting()
          self.inverted_index[token.string].add(token.doc_id, token.position)
        # Initialize spell-correct
        self.spell_correct = SpellCorrect(self.inverted_index)
        # Print out some details about the dataset
        total_time_taken = round(time.perf_counter() - starting_time, 4)
        print(f"It took: {total_time_taken} seconds to index the whole corpus.")
        print(f"It has {len(self.inverted_index)} entries in total.")

    def _processQuery(self, q: str) -> str:
        query_as_list_of_processed_tokens = self.processListOfTokens(self.process_document(q))
        # if query is a stop-word it will return None
        if len(query_as_list_of_processed_tokens) == 0:
          print(f"Query '{q}' a stop-word")
          return None
        # join a processed multi-word query
        term = " ".join(query_as_list_of_processed_tokens)
        print(f"{q} -> {term}", end="; ")
        if term in self.inverted_index:
          return term
        # attempt spell-correct
        else:
          most_probable_correction = self.spell_correct.correction(term)
          if most_probable_correction == None:
            print("No viable spelling fix was found")
          return most_probable_correction
      
    def dump(self, path: str) -> None:
        """
        provide a dump function to show index entries for a given set of terms        
        """
        print("passed_in_query -> query_after_processing: [list_of_occurances]")
        if os.path.exists(path) == False:
          print("Path to file you provided doesn't exist")
          return
        with open(path, 'r') as f:
          file_contents = f.read()
          examples = file_contents.split('\n')
          for query in examples:
            processed_query = self._processQuery(query)
            # if even after spell-correction no matching terms are found processed query will be None
            if processed_query is not None:
              print(processed_query, self.inverted_index[processed_query])
     
    def proximity_search(self, term1: str, term2: str, window_size: int = 3) -> dict:
        """
        1) check whether given two terms appear within a window
        2) calculate the number of their co-existance in a document
        3) add the document id and the number of matches into a dict
        return the dict"""
        term1 = self._processQuery(term1)
        if term1 is None:
          print("Cannot find an entry in Inverted Index for", term1)
          return
        term2 = self._processQuery(term2)
        if term2 is None:
          print("Cannot find an entry in Inverted Index for", term2)
          return
        documents_containing_both_terms = {}
        # i'm aware that this is usually implemented using `while` loops
        # however in python I can do it using for loops
        for term1_doc_id, term1_positions in self.inverted_index[term1]:
          # if both terms can be found in the same document
          if term1_doc_id in self.inverted_index[term2]:
            # start iterating through positions of term1
            for term1_position in term1_positions:
              # for each position of term1 check distance of all positions of term2 
              # (this loop can be broken early because the posting list sorted)
              for term2_position in self.inverted_index[term2][term1_doc_id]:
                if abs(term2_position - term1_position) <= window_size:
                  if term1_doc_id not in documents_containing_both_terms:
                    documents_containing_both_terms[term1_doc_id] = []
                  documents_containing_both_terms[term1_doc_id].append((term1_position, term2_position))
                # if term2 has passed the window of term1, move on to another term1 position
                elif term2_position - term1_position > window_size:
                  break
        return documents_containing_both_terms

# Testing my implementation

In [None]:
def main():
    "main call function"
    index = InvertedIndex() # initilaise the index
    corpus = index.read_data('/content/drive/MyDrive/Colab Notebooks/Simpsons2022') # specify the directory path in which files are located
    index.index_corpus(corpus) # index documents/corpus
    return index
index = main()

## Test `dump` method using `.txt` provided on BlackBoard

In [None]:
index.dump("/content/drive/MyDrive/Colab Notebooks/26957722.txt")

### Print out most frequent words within the corpus

In [None]:
sorted([(k,v.total_occurances) for k, v in index.inverted_index.items()], key=lambda x: x[1], reverse=True)[:100]

## Test positional indexing

In [None]:
# it's possible to find Gordie Howe using Boolean queries, even though he's not in the index
index.proximity_search('Gordie', 'Howe', 1)

In [None]:
# diacritics will be converted to ascii and matched
index.proximity_search('üter', 'character', 1)