EXERCISE 3

In [None]:
#!pip install nltk

In [None]:
import nltk
# nltk.download('reuters')
# nltk.download('punkt')
from nltk.corpus import reuters
from nltk.probability import FreqDist
from sklearn.model_selection import train_test_split
from collections import Counter
from nltk.util import ngrams
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
import math



In [None]:
# Load the 'reuters' corpus
sentences = reuters.sents()

In [None]:
# Splitting data into Training, Development and Test set
train_sents, test_sents = train_test_split(reuters.sents(), test_size=0.3, random_state=42)
dev_sents, test_sents = train_test_split(test_sents, test_size=0.5, random_state=42)


In [None]:
print(f'Number of sentences in train set: {len(train_sents)}')

Number of sentences in train set: 38301


In [None]:
# Transform the train sentences into words
train_words = [word for sentence in train_sents for word in sentence]
freq_dist_train = FreqDist(train_words)

In [None]:
cleaned_train_sentences = []
for sentence in train_sents:
    cleaned_train_sentence = [word if freq_dist_train[word] > 2 else '<UNK>' for word in sentence]
    cleaned_train_sentences.append(cleaned_train_sentence)

Now lets Build our model

In [None]:
from collections import Counter
from nltk.util import ngrams
from pprint import pprint

unigram_counter = Counter()
bigram_counter = Counter()
trigram_counter = Counter()

for sent in cleaned_train_sentences:
    unigram_counter.update([gram for gram in ngrams(sent, 1, pad_left=True, pad_right=True,
                                                   left_pad_symbol='<s>',right_pad_symbol='<e>') ])
    bigram_counter.update([gram for gram in ngrams(sent, 2, pad_left=True, pad_right=True,
                                                   left_pad_symbol='<s>',right_pad_symbol='<e>') ])
    trigram_counter.update([gram for gram in ngrams(sent, 3, pad_left=True, pad_right=True,
                                                   left_pad_symbol='<s>',right_pad_symbol='<e>') ])
# pprint(unigram_counter.most_common(10))
# pprint(bigram_counter.most_common(10))
print('Most common trigrams')
pprint(trigram_counter.most_common(10))

Most common trigrams
[(('.', '<e>', '<e>'), 34142),
 (('<s>', '<s>', 'The'), 6167),
 (('&', 'lt', ';'), 6054),
 (('said', '.', '<e>'), 5580),
 (('U', '.', 'S'), 3977),
 (('.', 'S', '.'), 3726),
 (('<s>', '<s>', '<UNK>'), 3103),
 (('lt', ';', '<UNK>'), 3002),
 (('<s>', '<s>', '"'), 2528),
 ((';', '<UNK>', '>'), 1957)]


In [None]:
# Build the vocab
vocab = [word[0] for word in unigram_counter]
print(f'Number of tokens in train set: {len(vocab)}')

Number of tokens in train set: 16516


In [None]:
def calculate_ngram_probability(ngram_counter, ngram_minus_one_counter, ngram, alpha, vocab_size):
    """
    Calculate bigram probability with Laplace smoothing
    :param ngram_counter: Counter which the key is a tuple of ngram and value its frequency
    :param ngram_minus_one_counter: Counter which the key is a tuple of n-1gram and value its frequency
    :param ngram: tuple
    :param alpha: float hyperparameter for Laplace smoothing
    :param vocab_size: int value which defines the whole size of the corpus
    :return: float probability of the ngram inside the corpus
    """
    ngram_count = ngram_counter[ngram]
    context = ngram[:-1]
    ngram_minus_one_count = ngram_minus_one_counter[context]
    ngram_prob = (ngram_count + alpha) / (ngram_minus_one_count + (alpha * vocab_size))
    # Convert to log probability
    ngram_prob = math.log2(ngram_prob)
    return ngram_prob

# BIGRAM GENERATE NEXT WORD GIVEN SEQUENCE

In [None]:
def generate_candidates(state, ngram_counter, model):
    """
    Given the state calculate the next possible words
    - state: The current word sequence
    - ngram_counter: Counter which the key is a tuple of n-1gram and value its frequency

    Returns:
    - Next state
    """
    # if ngram_counter = trigram_counter
    ngram_width = 1
    if model == 'trigram':
      ngram_width = 2
    prev_words = tuple(state[-ngram_width:])

    # Find candidates words
    next_words = [prev_words_tuple[-1] for (prev_words_tuple) in ngram_counter if prev_words == tuple(prev_words_tuple[:-1])]

    return [state + [next_word] for next_word in next_words]



In [None]:
def score(state, vocab_size, alpha, ngram_counter, ngram_minus_one_counter, model='trigram', dist=0, l1=1, l2=0, calculate_ngram_probability_fn=calculate_ngram_probability):
    """
    Calculate the log probability  of the word sequence

    Parameters:
    - state: The current word sequence.
    - vocab_size: The size of the vocabulary
    - alpha: float hyperparameter for Laplace smoothing
    - ngram_counter:
    - ngram_minus_one_counter
    - dist: int distance between words. Only for spell correcting.
    - l1: float hyperparameter for weighting the model. Deffault=1
    - l2: float hyperparameter for weigthing the distance. Deffault=0
    - calculate_ngram_probability_fn:

    Returns:
    - Log Probability
    """
    ngram_width = 1
    if model == 'trigram':
      ngram_width = 2
    probability = 0
    for i in range(ngram_width, len(state)):

        prev_words = tuple(state[i-ngram_width:i])

        probability += l1 * calculate_ngram_probability_fn(ngram_counter, ngram_minus_one_counter,(prev_words),alpha, vocab_size) + l2 * math.log2(1 / (dist + 1))
    return probability

In [None]:
def beam_search_sequence(initial_state, max_depth, beam_width, vocab_size, alpha, ngram_counter, ngram_minus_one_counter, generate_candidates_fn, score_fn):
    candidates = [(initial_state, 0)]

    for depth in range(max_depth):
        new_candidates = []
        for candidate, prob in candidates:
            for next_state in generate_candidates_fn(candidate, bigram_counter, 'bigram'):

                new_prob = prob + score_fn(next_state, vocab_size, alpha, ngram_counter, ngram_minus_one_counter,'bigram')
                new_candidates.append((next_state, new_prob))



        new_candidates = sorted(new_candidates, key=lambda x: x[1], reverse=True)
        candidates = new_candidates[:beam_width]
        print(candidates)

    best_sequence, best_prob = max(candidates, key=lambda x: x[1])
    return best_sequence


In [None]:
test_sentence = " I would like to come to"
initial_state = test_sentence.split(' ')[-1:]
max_depth = 10
beam_width = 5
best_sequence = beam_search_sequence(initial_state, max_depth, beam_width,len(vocab),0.01,bigram_counter,unigram_counter, generate_candidates, score)

print(test_sentence, ' '.join(best_sequence[1:]))  # Excluding the "<start>" token

[(['to', 'be'], -14.011576703176177), (['to', 'common'], -14.011576703176177), (['to', 'determine'], -14.011576703176177), (['to', 'continue'], -14.011576703176177), (['to', 'respond'], -14.011576703176177)]
[(['to', 'be', 'identified'], -42.03473010952853), (['to', 'be', 'better'], -42.03473010952853), (['to', 'be', 'slightly'], -42.03473010952853), (['to', 'be', 'created'], -42.03473010952853), (['to', 'be', 'nominated'], -42.03473010952853)]
[(['to', 'be', 'identified', ','], -84.06946021905706), (['to', 'be', 'identified', 'said'], -84.06946021905706), (['to', 'be', 'identified', '."'], -84.06946021905706), (['to', 'be', 'identified', 'as'], -84.06946021905706), (['to', 'be', 'identified', 'the'], -84.06946021905706)]
[(['to', 'be', 'identified', ',', 'none'], -140.11576703176178), (['to', 'be', 'identified', ',', 'said'], -140.11576703176178), (['to', 'be', 'identified', ',', 'they'], -140.11576703176178), (['to', 'be', 'identified', ',', 'its'], -140.11576703176178), (['to', 'be'

TRIGRAM GENERATE NEXT WORD GIVEN SEQUENCE

In [None]:
#TRIGRAM MODEL

In [None]:
def beam_search_decode(initial_state, max_depth, beam_width, generate_candidates_fn, score_fn):
    """
    Generate candidate words for a misspelled word, using between words distance.

    Parameters:
    - state: The current state.
    - word: The misspelled word.
    - word_list: List of words to search for candidates.
    - max_candidates: Maximum number of candidates
    - distance_fn: Distance function. Deffault damerau_levenshtein_distance

    Returns:
    - A list of candidate words.
    """
    candidates = [(initial_state, 1.0)]

    for depth in range(max_depth):
        new_candidates = []
        for candidate, prob in candidates:
            for next_state in generate_candidates_fn(candidate, trigram_counter, 'trigram'):
                new_prob = prob + score_fn(next_state, len(vocab), 0.01, trigram_counter, bigram_counter, 'trigram')
                new_candidates.append((next_state, new_prob))



        new_candidates = sorted(new_candidates, key=lambda x: x[1], reverse=True)

        candidates = new_candidates[:beam_width]

    best_sequence, best_prob = max(candidates, key=lambda x: x[1])
    print(best_sequence[-1],end=" ")
    return best_sequence


test_sentence = "I would like to"
initial_state = test_sentence.split(' ')[-2:]
max_depth = 20
beam_width = 3
print(test_sentence, end=" ")
best_sequence = beam_search_decode(initial_state, max_depth, beam_width, generate_candidates, score)
print(' '.join(best_sequence))

I would like to few like to see where else they can ' t expect that high level of public - sector consumption in the first few


In [None]:
import time

test_sentence = "I would like to"
initial_state = test_sentence.split(' ')[-2:]
max_depth = 20
beam_width = 3
print(test_sentence, end=" ")
best_sequence = beam_search_decode(initial_state, max_depth, beam_width, generate_candidates, score)

for i in range(2,len(best_sequence)):
  print(best_sequence[i], end=" ")  # Excluding the 2 first <start>" tokens
  time.sleep(0.5)

I would like to few see where else they can ' t expect that high level of public - sector consumption in the first few 

# Spelling Corrector

In [None]:
# Leveinstein Destance with transposition.
def damerau_levenshtein_distance(s1, s2):
    """
    Calculate the Damerau–Levenshtein distance between two strings.

    Parameters:
    - s1: first string
    - s2: second string

    Returns:
    - Damerau Levenshtein distance
    """
    len_s1 = len(s1)
    len_s2 = len(s2)
    d = [[0] * (len_s2 + 1) for _ in range(len_s1 + 1)]

    for i in range(len_s1 + 1):
        d[i][0] = i
    for j in range(len_s2 + 1):
        d[0][j] = j

    for i in range(1, len_s1 + 1):
        for j in range(1, len_s2 + 1):
            cost = 0 if s1[i - 1] == s2[j - 1] else 1
            d[i][j] = min(
                d[i - 1][j] + 1,  # deletion
                d[i][j - 1] + 1,  # insertion
                d[i - 1][j - 1] + cost,  # substitution
            )
            if i > 1 and j > 1 and s1[i - 1] == s2[j - 2] and s1[i - 2] == s2[j - 1]:
                d[i][j] = min(d[i][j], d[i - 2][j - 2] + cost)  # transposition

    return d[len_s1][len_s2]


In [None]:
# Take a word and the vocab and produce candidates/
def generate_candidate_with_distance(state, word, word_list, max_candidates=5, distance_fn=damerau_levenshtein_distance):
    """
    Generate candidate words for a misspelled word, using between words distance.

    Parameters:
    - state: The current state.
    - word: The misspelled word.
    - word_list: List of words to search for candidates.
    - max_candidates: Maximum number of candidates
    - distance_fn: Distance function. Deffault damerau_levenshtein_distance

    Returns:
    - A list of candidate words.
    """
    candidates = []

    for candidate in word_list:
        distance = distance_fn(word, candidate)

        candidates.append((candidate, distance))

    # Sort candidates by Distance distance in ascending order
    candidates.sort(key=lambda x: x[1])
    next_words = candidates[:max_candidates]

    # Return next word and distance
    return [(state + [next_word[0]], next_word[1]) for next_word in next_words]

# Example usage
misspelled_word = "candidat"
initial_state = ['<s>','<s>']
candidates = generate_candidate_with_distance(initial_state,misspelled_word, vocab, 10)
print(f"Candidate words for '{misspelled_word}': {candidates}")

Candidate words for 'candidat': [(['<s>', '<s>', 'candidate'], 1), (['<s>', '<s>', 'candidates'], 2), (['<s>', '<s>', 'canadian'], 3), (['<s>', '<s>', 'credit'], 4), (['<s>', '<s>', 'scandal'], 4), (['<s>', '<s>', 'confident'], 4), (['<s>', '<s>', 'capital'], 4), (['<s>', '<s>', 'Canada'], 4), (['<s>', '<s>', 'Canadian'], 4), (['<s>', '<s>', 'consider'], 4)]


In [None]:
# def score(state, vocab_size, alpha, ngram_counter, ngram_minus_one_counter, dist=0, l1=1, l2=0, calculate_ngram_probability_fn=calculate_ngram_probability):
#     """
#     Calculate the log probability  of the word sequence

#     Parameters:
#     - state: The current word sequence.
#     - vocab_size: The size of the vocabulary
#     - alpha: float hyperparameter for Laplace smoothing
#     - ngram_counter:
#     - ngram_minus_one_counter
#     - dist: int distance between words. Only for spell correcting.
#     - l1: float hyperparameter for weighting the model. Deffault=1
#     - l2: float hyperparameter for weigthing the distance. Deffault=0
#     - calculate_ngram_probability_fn:

#     Returns:
#     - Log Probability
#     """
#     probability = 0
#     for i in range(2, len(state)):

#         prev_words = tuple(state[i-len(state):])
#         probability += l1 * calculate_ngram_probability_fn(ngram_counter, ngram_minus_one_counter,(prev_words),alpha, vocab_size) + l2 * math.log2(1 / (dist + 1))
#     return probability

In [None]:
def beam_search_spelling(sentence, beam_width, l1, l2, generate_candidates_fn, score_fn):
    """
    Spelling correction with contect awereness using beam search

    Parameters:
    - sentence: The sentence we try to correct
    - beam_width: The width of beam search algorithm.
    - generate_candidates_fn: function that generates candidate words
    - score_fn: Function that calculates the log probability

    Returns:
    - The most probable sequence corrected.
    """

    initial_state = ['<s>','<s>']
    candidates = [(initial_state, 0)]
    # sentence = word_tokenize(sentence)
    max_depth = len(sentence)
    for depth in range(max_depth):
        new_candidates = []
        for candidate, prob in candidates:
            for next_state, dist in generate_candidates_fn(candidate, sentence[depth],vocab):

                # Prob we add the previous prob, the prob of the next state and the inverse of the distance
                new_prob = prob + score_fn(next_state,len(vocab),0.01, trigram_counter, bigram_counter, dist, l1=0.2, l2=0.8)

                new_candidates.append((next_state, new_prob))


        new_candidates = sorted(new_candidates, key=lambda x: x[1], reverse=True)

        candidates = new_candidates[:beam_width]
        # print(candidates)
    best_sequence, best_prob = max(candidates, key=lambda x: x[1])
    return best_sequence[2:]


test_sentence = word_tokenize("I ae coming to down")
beam_width = 5
best_sequence = beam_search_spelling(test_sentence, beam_width, 0.2, 0.8, generate_candidate_with_distance, score)
print(' '.join(best_sequence))  # Excluding the "<start>" token

I be coming to down


In [None]:
print(best_sequence[2:])

['I', 'are', 'coming', 'to', 'down']


# EVALUATE

In [None]:
import random

def replace_characters(sentence, probability):
    modified_sentence = []
    for word in sentence:
        modified_word = ''
        for char in word:
            if char != ' ' and random.random() < probability:
                # Replace non-space character with a visually or acoustically similar character
                # You can customize this part based on your preference or use external libraries for similarity
                modified_char = get_similar_char(char)
                modified_word += modified_char
            else:
                modified_word += char
        modified_sentence.append(modified_word)
    return modified_sentence

def get_similar_char(char):
    # Replace this with your logic to get a visually or acoustically similar character
    # For simplicity, using a basic example here (you can expand this based on your requirements)
    similar_chars = {'a': 'e', 'b': 'd', 'c': 'e', 'd': 'b', 'e': 'a', 'f': 'g',
                     'g': 'f', 'h': 'i', 'i': 'h', 'j': 'k', 'k': 'j', 'l': 'm',
                     'm': 'l', 'n': 'o', 'o': 'n', 'p': 'q', 'q': 'p', 'r': 's',
                     's': 'r', 't': 'u', 'u': 't', 'v': 'w', 'w': 'v', 'x': 'y',
                     'y': 'x', 'z': 'z'}
    return similar_chars.get(char, char)

def modify_corpus(corpus, probability):
    modified_corpus = []
    for sentence in corpus:
        modified_sentence = replace_characters(sentence, probability)
        modified_corpus.append(modified_sentence)
    return modified_corpus

# Example usage with a probability of 0.1 (10% chance of replacing each non-space character)
modified_test_corpus = modify_corpus(test_sents, 0.1)

print_cnt = 0
for sent in modified_test_corpus:
  print_cnt +=1
  print(sent)
  print("______________________")
  if print_cnt == 20:
    break;

['The', 'commission', 'hs', 'expeeted', 'to', 'approve', 'tha', 'appmication', 'at', 'a', 'meeting', 'unmorrow', '.']
______________________
['"', 'The', 'United', 'States', 'and', 'the', 'rix', 'major', 'iodtsurial', 'countries', 'are', 'fully', 'commitued', 'to', 'impmameoting', 'our', 'undestakings', 'in', 'uhese', 'agreements', ',"', 'Baker', 'told', 'the', 'meetings', '.']
______________________
['Soo', 'Lioe', 'said', 'in', 'Jaouary', 'it', 'was', 'seeking', 'bids', 'for', 'tha', 'qrnperty', '.']
______________________
['Cein', '-', 'Slnan', 'has', 'four', 'rtoras', 'hn', 'Nashville', '.']
______________________
['Romero', 'said', 'he', 'woumb', 'telm', 'big', 'duyers', 'nf', 'copre', 'meal', 'in', 'London', 'thet', 'the', 'Philippines', 'was', 'dning', 'ius', 'best', 'to', 'meet', 'EC', 'standards', '.']
______________________
['Pretax', 'profits', 'also', 'bipqeb', 'uo', '601', '.', '7', 'mmn', 'stg', 'after', '614', '.', '4', 'mln', '.']
______________________
['Asked', 'in', 

In [None]:
import numpy as np

# Take a portion of the test_corpus and modified_test_corpus
org_sent = test_sents[:5]
wrg_sent = modified_test_corpus[:5]

def correct_corpus_np(corpus, vocab, max_candidates=5):
    corrected_corpus = []
    for sentence in corpus:
        corrected_sentence = beam_search_spelling(sentence, 3,0.2,0.8, generate_candidate_with_distance, score)
        print(corrected_sentence)
        corrected_corpus.append(corrected_sentence)
    return corrected_corpus

corrected_test_corpus = correct_corpus_np(wrg_sent, vocab, 5)

print("Original Test Corpus:")
print(org_sent)

print("\nModified Test Corpus:")
print(wrg_sent)

print("\nCorrected Test Corpus:")
print(corrected_test_corpus)


['The', 'commission', 'has', 'expected', 'to', 'approve', 'that', 'application', 'at', 'a', 'meeting', 'tomorrow', '.']
['"', 'The', 'United', 'States', 'and', 'the', 'six', 'major', 'industrial', 'countries', 'are', 'fully', 'committed', 'to', 'implementing', 'our', 'undertakings', 'in', 'These', 'agreements', ',"', 'Baker', 'told', 'the', 'meetings', '.']
['Soo', 'Life', 'said', 'in', 'January', 'it', 'was', 'seeking', 'bids', 'for', 'that', 'property', '.']
['Cain', '-', 'plan', 'has', 'four', 'Stores', 'an', 'Nashville', '.']
['Romero', 'said', 'he', 'would', 'term', 'big', 'buyers', 'of', 'copra', 'meal', 'in', 'London', 'that', 'the', 'Philippines', 'was', 'doing', 'its', 'best', 'to', 'meet', 'EC', 'standards', '.']
Original Test Corpus:
[['The', 'commission', 'is', 'expected', 'to', 'approve', 'the', 'application', 'at', 'a', 'meeting', 'tomorrow', '.'], ['"', 'The', 'United', 'States', 'and', 'the', 'six', 'major', 'industrial', 'countries', 'are', 'fully', 'committed', 'to', 

In [None]:
# !pip install evaluate
# !pip install jiwer
from evaluate import load

# Flatten the list of lists
flattened_corrected_test_corpus = [' '.join(sentence) for sentence in corrected_test_corpus]
flattened_org_sent = [' '.join(sentence) for sentence in org_sent]

# Transform predictions
predictions = [' '.join(flattened_corrected_test_corpus)]
references = [' '.join(flattened_org_sent)]

wer = load("wer")  # Load Word-Error-Rate metric
wer_score = wer.compute(predictions=predictions, references=references)
print(f"WER score is: {wer_score}")

cer = load("cer")
cer_score = cer.compute(predictions=predictions, references=references)
print(f"CER score is: {cer_score}")


# Flatten the list of lists
flattened_corrected_test_corpus = [' '.join(sentence) for sentence in wrg_sent]
flattened_org_sent = [' '.join(sentence) for sentence in org_sent]

# Transform predictions
predictions = [' '.join(flattened_corrected_test_corpus)]
references = [' '.join(flattened_org_sent)]

wer = load("wer")  # Load Word-Error-Rate metric
wer_score = wer.compute(predictions=predictions, references=references)
print(f"WER score is: {wer_score}")

cer = load("cer")
cer_score = cer.compute(predictions=predictions, references=references)
print(f"CER score is: {cer_score}")


WER score is: 0.10588235294117647
CER score is: 0.029787234042553193
WER score is: 0.3176470588235294
CER score is: 0.07446808510638298
