## NLP Lab Assignment 2

***Student Details:***

- Name : Anjishnu Mukherjee
- Registration Number : B05-511017020
- Exam Roll Number : 510517086
- Email : 511017020.anjishnu@students.iiests.ac.in

## Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


## Data files

In [2]:
data_dir = "/content/drive/MyDrive/NLP_LAB/Assignment-2/"
data_file_1 = "big.txt"
data_file_2 = "shakespeare.txt"

## Import libraries

In [3]:
import functools
import math
from typing import Dict, List, Optional, Any

import re
from collections import Counter

## Minimum Edit Distance Class

In [4]:
# utility function
def ignore_unhashable(func):
    """[To allow caching the distance matrix in its original form, which is a
        list of lists. Reference : ->
        https://stackoverflow.com/a/64111268/11009359]

    Args:
        func ([type]): [The function where we intend to apply this decorator.]
    """
    uncached = func.__wrapped__
    attributes = functools.WRAPPER_ASSIGNMENTS + ("cache_info", "cache_clear")

    @functools.wraps(func, assigned=attributes)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except TypeError as error:
            if "unhashable type" in str(error):
                return uncached(*args, **kwargs)
            raise

    wrapper.__uncached__ = uncached
    return wrapper


class MinimumEditDistance:
    def __init__(
        self,
        insertion_cost: Optional[int] = 1,
        deletion_cost: Optional[int] = 1,
        substitution_cost: Optional[int] = 2,
    ) -> None:
        """[Creates an instance for the class.]

        Args:
            insertion_cost (Optional[int], optional): [description]. Defaults to 1.
            deletion_cost (Optional[int], optional): [description]. Defaults to 1.
            substitution_cost (Optional[int], optional): [description]. Defaults to 2.
        """
        self.__insertion_cost = insertion_cost
        self.__deletion_cost = deletion_cost
        self.__substitution_cost = substitution_cost

    def align(self, query: str, target: str) -> Dict:
        """[Compute edit distance. Return a dictionary containing the edit
            distance, the matrix from Wagner Fischer algorithm, a single optimal
            alignment, and a list of all possible alignments.]

        Args:
            query (str): [The query string.]
            target (str): [The target string.]

        Returns:
            Dict: [Wagner Fischer results, single optimal alignment, all optimal
            alignments]
        """
        wagner_fischer_results = self.weightedWagnerFischer(query, target)

        single_optimal_alignment = self.getOptimalAlignment(
            query, target, wagner_fischer_results["distance_matrix"]
        )

        all_optimal_alignments = self.getAllOptimalAlignments(
            query, target, wagner_fischer_results["distance_matrix"]
        )

        result = dict()
        result["wagner_fischer_results"] = wagner_fischer_results
        result["single_optimal_alignment"] = single_optimal_alignment
        result["all_optimal_alignments"] = all_optimal_alignments
        return result

    def niceMatrix(self, matrix: List) -> None:
        """[Prints a matrix in a nice human readable format.]

        Args:
            matrix (List): [The matrix which will be printed, stored as a list
            of lists.]
        """
        print(
            "\n".join(
                [
                    "".join(["{:4}".format(item) for item in row])
                    for row in matrix
                ]
            )
        )

    def niceAlignment(self, alignment: Dict) -> None:
        """[Prints the alignment of query and target in a nice human readable
        format.]

        Args:
            alignment (Dict): [A dictionary containing the aligned query and
            target strings, and a string for the actions needed for alignment.
            For the string of operations, d is for delete, i is for insertion, |
            is for match, s is for substitution. For the aligned target and
            query strings, - corresponds to an insertion or deletion.]
        """
        query, target, operations = (
            alignment["aligned_query"],
            alignment["aligned_target"],
            alignment["operations"],
        )
        print(
            " ".join(query)
            + "\n"
            + " ".join(operations)
            + "\n"
            + " ".join(target)
        )

    def weightedWagnerFischer(self, query: str, target: str) -> Dict:
        """[Implements the Wagner Fischer algorithm. Details of the algorithm
        can be found here ->
        https://en.wikipedia.org/wiki/Wagner–Fischer_algorithm .]

            Args:
                query (str): [The query string.]
                target (str): [The target string.]

            Returns:
                Dict: [Contains the edit distance between query and target
                strings and also the distance matrix.]
        """
        len1, len2 = len(query), len(target)
        distance = [[0 for i in range(len2 + 1)] for j in range(len1 + 1)]

        # query prefixes can be transformed into empty target by deleting all
        # characters
        for i in range(len1 + 1):
            distance[i][0] = i * self.__deletion_cost

        # target prefixes can be reached from empty query prefix by inserting
        # every character
        for i in range(len2 + 1):
            distance[0][i] = i * self.__insertion_cost

        for i in range(1, len1 + 1):
            for j in range(1, len2 + 1):
                diagonal = distance[i - 1][j - 1] + (
                    0
                    if query[i - 1] == target[j - 1]
                    else self.__substitution_cost
                )
                left = distance[i][j - 1] + self.__insertion_cost
                up = distance[i - 1][j] + self.__deletion_cost

                distance[i][j] = min(diagonal, left, up)

        edit_distance = distance[len1][len2]
        wagner_fischer_results = dict()
        wagner_fischer_results["edit_distance"] = edit_distance
        wagner_fischer_results["distance_matrix"] = distance
        return wagner_fischer_results

    @staticmethod
    def reverseString(input_string: str) -> str:
        """[Utility method to reverse a string.]

        Args:
            input_string (str): [The string to be reversed.]

        Returns:
            str: [The reversed string.]
        """
        start = stop = None
        step = -1
        reverse_slice = slice(start, stop, step)
        return input_string[reverse_slice]

    def getOptimalAlignment(
        self, query: str, target: str, distance: List
    ) -> Dict:
        """[Get a single optimal alignment from the edit distance matrix.]

        Args:
            query (str): [The query string.]
            target (str): [The target string.]
            distance (List): [The edit distance matrix from Wagner Fischer
            algorithm.]

        Returns:
            Dict: [A dictionary containing the aligned query and target strings
            and a string of the operations needed for tha alignment.]
        """
        aligned_query, aligned_target, operation = "", "", ""
        i, j = len(query), len(target)

        while i != 0 or j != 0:

            # find the value of diagonal, top and left cells if they exist
            is_match = False
            diagonal, top, left = math.inf, math.inf, math.inf
            if i != 0 and j != 0:
                is_match = query[i - 1] == target[j - 1]
                diagonal = distance[i - 1][j - 1] + (
                    0 if is_match else self.__substitution_cost
                )
            if i != 0:
                top = distance[i - 1][j] + self.__deletion_cost
            if j != 0:
                left = distance[i][j - 1] + self.__insertion_cost

            # find global min of the three cells
            min_dist = min(diagonal, top, left)

            # check which of those 3 cells correspond to the global min
            if diagonal == min_dist:
                aligned_query += query[i - 1]
                aligned_target += target[j - 1]
                operation += "|" if is_match else "s"
                i -= 1
                j -= 1
            elif top == min_dist:
                aligned_query += query[i - 1]
                aligned_target += "-"
                operation += "d"
                i -= 1
            else:
                aligned_query += "-"
                aligned_target += target[j - 1]
                operation += "i"
                j -= 1

        aligned_query, aligned_target, operation = (
            self.reverseString(aligned_query),
            self.reverseString(aligned_target),
            self.reverseString(operation),
        )

        optimal_alignment = dict()
        optimal_alignment["aligned_query"] = aligned_query
        optimal_alignment["aligned_target"] = aligned_target
        optimal_alignment["operations"] = operation
        return optimal_alignment

    def getAllOptimalAlignments(
        self, query: str, target: str, distance: List
    ) -> List:
        """[Wrapper method that calls a private method which performs
        backtracking to return all optimal alignments. ]

        Args:
            query (str): [The query string.]
            target (str): [The target string.]
            distance (List): [The edit distance matrix from Wagner Fischer.]

        Returns:
            List: [A list of dictionaries, where each dictionary is an
            alignment.]
        """
        self.__alignments = []
        self.__get_all_optimal_alignments(
            query, target, distance, len(query), len(target), "", "", ""
        )
        return self.__alignments

    @ignore_unhashable
    @functools.lru_cache(maxsize=10)
    def __get_all_optimal_alignments(
        self,
        query: str,
        target: str,
        distance: List,
        i: int,
        j: int,
        aligned_query: str,
        aligned_target: str,
        operation: str,
    ) -> None:
        """[A backtracking algorithm that finds all paths from distance[m][n] to
        distance[0][0], where distance is the edit distance matrix from weighed
        Wagner Fischer algorithm, and m and n are the lengths of the query and
        target strings respectively. The paths found are the represented as
        alignments which are then stored in a private class member list. Up to
        10 most recent calls are cached in a lru cache.]

        Args:
            query (str): [The query string.]
            target (str): [The target string.]
            distance (List): [The edit distance matrix from weighted Wagner
            Fischer algorithm.]
            i (int): [The starting index for this recursive backtracking
            algorithm, for query string.]
            j (int): [The starting index for this recursive backtracking
            algorithm, for target string.]
            aligned_query (str): [Aligned query string.]
            aligned_target (str): [Aligned target string.]
            operation (str): [String of operations for converting query to
            target.]

        """

        if i == 0 and j == 0:
            optimal_alignment = dict()
            optimal_alignment["aligned_query"] = self.reverseString(
                aligned_query
            )
            optimal_alignment["aligned_target"] = self.reverseString(
                aligned_target
            )
            optimal_alignment["operations"] = self.reverseString(operation)
            self.__alignments.append(optimal_alignment)

        else:

            # find the value of diagonal, top and left cells if they exist
            is_match = False
            diagonal, top, left = math.inf, math.inf, math.inf
            if i != 0 and j != 0:
                is_match = query[i - 1] == target[j - 1]
                diagonal = distance[i - 1][j - 1] + (
                    0 if is_match else self.__substitution_cost
                )
            if i != 0:
                top = distance[i - 1][j] + self.__deletion_cost
            if j != 0:
                left = distance[i][j - 1] + self.__insertion_cost

            # find global min of the three cells
            min_dist = min(diagonal, top, left)

            # check which of those 3 cells correspond to the global min
            if diagonal == min_dist:
                self.__get_all_optimal_alignments(
                    query,
                    target,
                    distance,
                    i - 1,
                    j - 1,
                    aligned_query + query[i - 1],
                    aligned_target + target[j - 1],
                    operation + ("|" if is_match else "s"),
                )
            if top == min_dist:
                self.__get_all_optimal_alignments(
                    query,
                    target,
                    distance,
                    i - 1,
                    j,
                    aligned_query + query[i - 1],
                    aligned_target + "-",
                    operation + "d",
                )
            if left == min_dist:
                self.__get_all_optimal_alignments(
                    query,
                    target,
                    distance,
                    i,
                    j - 1,
                    aligned_query + "-",
                    aligned_target + target[j - 1],
                    operation + "i",
                )


## Minimum Edit Distance Example

In [5]:
str1 = "TELEPHONE"
str2 = "ELEPHANT"

med = MinimumEditDistance(1, 1, 2)
alignment = med.align(str1, str2)

### Edit Distance

In [6]:
print(
    "Edit Distance : ",
    alignment["wagner_fischer_results"]["edit_distance"],
)
print()

Edit Distance :  5



### Distance Matrix

In [7]:
print("Edit Distance Matrix :")
med.niceMatrix(
    alignment["wagner_fischer_results"]["distance_matrix"]
)
print()

Edit Distance Matrix :
   0   1   2   3   4   5   6   7   8
   1   2   3   4   5   6   7   8   7
   2   1   2   3   4   5   6   7   8
   3   2   1   2   3   4   5   6   7
   4   3   2   1   2   3   4   5   6
   5   4   3   2   1   2   3   4   5
   6   5   4   3   2   1   2   3   4
   7   6   5   4   3   2   3   4   5
   8   7   6   5   4   3   4   3   4
   9   8   7   6   5   4   5   4   5



### Single Optimal Alignment

In [8]:
print("Single Optimal Alignment : ")
med.niceAlignment(alignment["single_optimal_alignment"])
print()

Single Optimal Alignment : 
T E L E P H O N E
d | | | | | s | s
- E L E P H A N T



### All Optimal Alignments

In [9]:
print("All Optimal Alignments : ")
for i in range(len(alignment["all_optimal_alignments"])):
    optimal_alignment = alignment["all_optimal_alignments"][i]
    print("-"*30)
    print("Alignment #",i + 1)
    print("-"*30)
    med.niceAlignment(optimal_alignment)

All Optimal Alignments : 
------------------------------
Alignment # 1
------------------------------
T E L E P H O N E
d | | | | | s | s
- E L E P H A N T
------------------------------
Alignment # 2
------------------------------
T E L E P H - O N E
d | | | | | i d | s
- E L E P H A - N T
------------------------------
Alignment # 3
------------------------------
T E L E P H O - N E
d | | | | | d i | s
- E L E P H - A N T
------------------------------
Alignment # 4
------------------------------
T E L E P H O N - E
d | | | | | s | i d
- E L E P H A N T -
------------------------------
Alignment # 5
------------------------------
T E L E P H - O N - E
d | | | | | i d | i d
- E L E P H A - N T -
------------------------------
Alignment # 6
------------------------------
T E L E P H O - N - E
d | | | | | d i | i d
- E L E P H - A N T -
------------------------------
Alignment # 7
------------------------------
T E L E P H O N E -
d | | | | | s | d i
- E L E P H A N - T
----------------

## AutoCorrect class

In [10]:
class AutoCorrect:
    def __init__(self, word_corpus: str) -> None:
        """Creates an instance for the class."""
        try:
            with open(word_corpus, "r") as inputFile:
                self.__word_corpus = inputFile.read()
                self.load_corpus()
        except IOError:
            print("Couldn't read input corpus.")
            exit(1)

    @staticmethod
    def tokenize(text: str) -> List:
        """List all the word tokens (consecutive letters) in a text.
        Normalize to lowercase."""
        return re.findall("[a-z]+", text.lower())

    @functools.lru_cache(maxsize=100)
    def load_corpus(self) -> None:
        """Tokenize the corpus and load it into a Counter."""
        self.CORPUS = self.tokenize(self.__word_corpus)
        self.COUNTS = Counter(self.CORPUS)
        self.UNIQUE_WORDS = sorted(set(self.CORPUS))

    def print_info(
        self, candidates: List, edit_distances: List, probabilites: List
    ) -> None:
        """Helper function for printing viable candidates, their frequencies and
        edit_distances when use_frequency=True.
        """
        print(
            "\nUsing a dictionary of ",
            len(self.UNIQUE_WORDS),
            " unique words \nand a total of ",
            len(self.CORPUS),
            " words to predict the most probable word...\n",
        )

        l = list(zip(candidates, probabilites, edit_distances))

        # sort by frequency
        l = sorted(l, key=lambda t: t[1], reverse=True)

        # sort by edit distance
        l = sorted(l, key=lambda t: t[2])

        print("List of viable candidates :")
        print("-" * 80)
        print(
            "{:>20}{:>20}{:>20}".format(
                "candidate", "frequency", "edit_distance"
            )
        )
        print("-" * 80)
        for c, p, d in l:
            print("{:>20}{:>20}{:>20}".format(c, p, d))
        print("-" * 80)

    def autocorrect(
        self,
        word: str,
        threshold: Optional[int] = 2,
        use_frequency: Optional[bool] = True,
    ) -> Any:
        """
        Given a word w, find the most likely correction c = correct(w).

        Approach: Try all candidate words c that are known words that are 'near'
        w. Choose the most 'likely' one.

        To balance near and likely, in a trivial way: Measure nearness by
        edit distance <= threshold, and choose the most likely word from the
        given text by frequency.

        Reference :
        http://nbviewer.jupyter.org/url/norvig.com/ipython/How%20to%20Do%20Things%20with%20Words.ipynb
        """
        word = word.lower()

        # if the word is in the loaded corpus,
        if word in self.UNIQUE_WORDS:
            print("No corrections required.")
            return word

        med = MinimumEditDistance(
            insertion_cost=1, deletion_cost=1, substitution_cost=2
        )

        candidates = []
        edit_distances = []
        for candidate in self.UNIQUE_WORDS:
            weightedWagnerFischerResults = med.weightedWagnerFischer(
                word, candidate
            )
            distance = weightedWagnerFischerResults["edit_distance"]
            if distance <= threshold:
                candidates.append(candidate)
                edit_distances.append(distance)

        if not candidates:
            print("No viable correction found for given word.")
            return word

        if not use_frequency:
            return candidates

        else:
            probabilites = []
            for possible in candidates:
                probabilites.append(self.COUNTS[possible])
            l = list(zip(candidates, probabilites, edit_distances))

            # prefer edits that are 1 edit away
            edit1 = [c for c, p, d in l if d == 1]
            if edit1:
                viable_candidate = max(edit1, key=self.COUNTS.get)

            # if there are no words 1 edit away, consider all words
            else:
                viable_candidate = max(candidates, key=self.COUNTS.get)

            return viable_candidate, candidates, edit_distances, probabilites


## AutoCorrect example, using input dictionary 

In [11]:
autocorrection_tool = AutoCorrect(data_dir+data_file_2)
test1 = "sui"
(
    x1,
    candidates,
    edit_distances,
    probabilites,
) = autocorrection_tool.autocorrect(test1, threshold=2, use_frequency=True)

In [12]:
autocorrection_tool.print_info(
    candidates, edit_distances, probabilites
)
print("Given word : ", test1)
print("Chosen candidate(s) : ", x1)
print()
print()
print("-" * 80)
print()


Using a dictionary of  23683  unique words 
and a total of  928012  words to predict the most probable word...

List of viable candidates :
--------------------------------------------------------------------------------
           candidate           frequency       edit_distance
--------------------------------------------------------------------------------
                suit                 145                   1
                suis                   4                   1
                  si                   3                   1
                  su                   2                   1
                   i               22538                   2
                   s                7723                   2
                 sir                2764                   2
                 sun                 237                   2
                 sit                 214                   2
                 sin                 159                   2
                 six      

## AutoCorrect example, without dictionary

In [13]:
autocorrection_tool = AutoCorrect(data_dir+data_file_2)
test2 = "sui"
x2 = autocorrection_tool.autocorrect(test2, threshold=2, use_frequency=False)
print("Given word : ", test2)
print("Chosen candidate(s) : ", x2)
print()

Given word :  sui
Chosen candidate(s) :  ['i', 'oui', 'qui', 's', 'si', 'sin', 'sip', 'sir', 'sit', 'six', 'siz', 'sluic', 'su', 'sub', 'sue', 'sug', 'suis', 'suit', 'suits', 'sum', 'sun', 'sup', 'sur', 'u']

