In [137]:
import torch
from torch.utils.data import Dataset
import multiprocessing
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import time

Lets try using just the trigram NN to predict missing words with missing characters, and then the trigram + a kind of spellcheck

In [138]:
def find_candidates(word, vocabulary, word_to_id_map, missing_token="@"):
    candidates = []

    for vocab_word in vocabulary:
        if len(word) != len(vocab_word):
            continue  # Skip words with different lengths

        candidate = []
        for char1, char2 in zip(word, vocab_word):
            if char1 == missing_token:
                candidate.append(char2)
            elif char1 == char2:
                candidate.append(char2)  
            else:
                break #mismatch, skip
        else:
            try:
                candidates.append(word_to_id_map["".join(candidate)])
            except:
                print(word, vocab_word, candidate,"".join(candidate) )
                raise Exception

    return candidates

In [139]:
#load wikitext data

train = []
test = []

train_file_path = '../newtraincorpus.txt'
test_file_path = '../newtestcorpus.txt'

with open(train_file_path, 'r', encoding='utf-8') as file:
    for line in file:
            # Process each line as a sentence
            words = (line.strip().split())
            train.append(words)

with open(test_file_path, 'r', encoding='utf-8') as file:
    for line in file:
            # Process each line as a sentence
            words = (line.strip().split())
            test.append(words)

print(len(train))
print(len(test))

77363
9418


In [140]:
#create vocabulary

UNK_symbol = "<UNK>"
vocab = set([UNK_symbol])



# create term frequency of the words
words_term_frequency_train = {}
for doc in train:
    for word in doc:
        # this will calculate term frequency
        # since we are taking all words now
        words_term_frequency_train[word] = words_term_frequency_train.get(word,0) + 1

# create vocabulary
for doc in train:
    for word in doc:
        if words_term_frequency_train.get(word,0) >= 5:
            vocab.add(word)

# remove "@-@" from vocab
vocab.remove("@-@")

print(len(vocab))

19114


In [141]:
#create trigrams, just need the test set for this

import numpy as np

x_test = []
y_test = []


# create word to id mappings
word_to_id_mappings = {}
for idx,word in enumerate(vocab):
    word_to_id_mappings[word] = idx

id_to_word_mappings = {v: k for k, v in word_to_id_mappings.items()}

# function to get id for a given word
# return <UNK> id if not found
def get_id_of_word(word):
    unknown_word_id = word_to_id_mappings['<UNK>']
    return word_to_id_mappings.get(word,unknown_word_id)


for sentence in test:
    for i,word in enumerate(sentence):
        if i+2 >= len(sentence):
            # sentence boundary reached
            # ignoring sentence less than 3 words
            break
        # convert word to id
        x_extract = [get_id_of_word(word),get_id_of_word(sentence[i+1])]
        y_extract = [get_id_of_word(sentence[i+2])]

        x_test.append(x_extract)
        y_test.append(y_extract)
  


x_test = np.array(x_test)
y_test = np.array(y_test)  
  
print(x_test.shape)
print(y_test.shape)

(174126, 2)
(174126, 1)


In [142]:
#this dataset will add a version of the label, (third word in the trigram), with a single character removed 

import random

def randomly_replace_char(input_str, missing_token="@"):
    if not input_str:
        return input_str  # Return the input string unchanged if it's empty or less than 2

    index_to_replace = random.randint(0, len(input_str) - 1)
    modified_str = (
        input_str[:index_to_replace] + missing_token + input_str[index_to_replace + 1:]
    )

    return modified_str

class charMaskDataset(Dataset):
    def __init__(self, data, labels, id_to_word):
        """
        Args:
            data (list): List of input data samples.
            labels (list): List of corresponding labels.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.data = data
        self.labels = labels
        self.id_to_word = id_to_word

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        """
        Args:
            index (int): Index of the data sample.

        Returns:
            tuple: (data_sample, label) where data_sample is the input data and label is the corresponding label.
        """
        context = self.data[index]
        label = self.labels[index]


        label_word = self.id_to_word[label[0]]

        #randomly remove a single char from the label word

        masked_word = randomly_replace_char(label_word)



        return torch.tensor(context).type(torch.LongTensor), torch.tensor(label).type(torch.LongTensor), masked_word


In [143]:
# Trigram Neural Network Model
class TrigramNNmodel(nn.Module):

    def __init__(self, vocab_size, embedding_dim, context_size, h):
        super(TrigramNNmodel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, h)
        self.linear2 = nn.Linear(h, vocab_size, bias = False)

    def forward(self, inputs):
        # compute x': concatenation of x1 and x2 embeddings
        embeds = self.embeddings(inputs).view((-1,self.context_size * self.embedding_dim))
        # compute h: tanh(W_1.x' + b)
        out = torch.tanh(self.linear1(embeds))
        # compute W_2.h
        out = self.linear2(out)
        # compute y: log_softmax(W_2.h)
        log_probs = F.log_softmax(out, dim=1)
        # return log probabilities
        # BATCH_SIZE x len(vocab)
        return log_probs

In [144]:
dataset = charMaskDataset(x_test, y_test, id_to_word_mappings)


BATCH_SIZE = 256


test_loader = DataLoader(dataset, batch_size = BATCH_SIZE)

In [158]:
# create parameters
gpu = 0 
# word vectors size
EMBEDDING_DIM = 200
CONTEXT_SIZE = 2
BATCH_SIZE = 256
# hidden units
H = 100


# check if gpu is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)


best_model = TrigramNNmodel(len(vocab), EMBEDDING_DIM, CONTEXT_SIZE, H)
best_model.load_state_dict(torch.load("models/best_model_1.dat"))
best_model.to(device)

cuda


TrigramNNmodel(
  (embeddings): Embedding(19114, 200)
  (linear1): Linear(in_features=400, out_features=100, bias=True)
  (linear2): Linear(in_features=100, out_features=19114, bias=False)
)

In [146]:
#evaluate just basic spellcheck

def eval_spellcheck(labels, masked_words, device):

    preds = []
    for word in masked_words:
        neighbors = find_candidates(word, vocab, word_to_id_mappings)
        
        #if there are results, just pick the closest word
        if len(neighbors):
            pred = neighbors[0]
        else: #just predict the unknown token
            pred = word_to_id_mappings["<UNK>"]
        
        preds.append(pred)
    
    preds = torch.tensor(preds).type(torch.LongTensor).to(device)

    return (preds == labels).float().mean()


def eval_spellcheck_with_NN(log_probs, labels, masked_words, device):

    preds = []
    for word in masked_words:
        neighbors = find_candidates(word, vocab, word_to_id_mappings)

        #NN probs
        probs = torch.exp(log_probs)
        
        # Sort indices based on probabilities, each indice corresponds to a word id
        sorted_indices = torch.argsort(probs, descending=True).tolist()

        #iterate through words first based on distance, and see how they match with the NN probs, the first match will be our pred

        b = False
        pred = sorted_indices[0] #default prediction if no match
        for neighbor in neighbors:
            for id in sorted_indices:
                if neighbor == id:
                    pred = id
                    b = True
                    break
            if b:
                break
        
        preds.append(pred)
           

    
    preds = torch.tensor(preds).type(torch.LongTensor).to(device)

    return (preds == labels).float().mean()


# helper function to get accuracy from log probabilities
def get_accuracy_from_log_probs(log_probs, labels):
    probs = torch.exp(log_probs)
    predicted_label = torch.argmax(probs, dim=1)
    acc = (predicted_label == labels).float().mean()
    return acc

In [147]:
# helper function to evaluate model on dev data
def evaluate(model, dataloader, device):
    model.eval()

    NN_mean_acc = 0
    SP_mean_acc = 0
    SPNN_mean_acc = 0
    count = 0

    with torch.no_grad():
        dev_st = time.time()
        for it, data_tensor in enumerate(dataloader):
            context_tensor = data_tensor[0]
            target_tensor = data_tensor[1]
            masked_words = data_tensor[2]

            context_tensor, target_tensor = context_tensor.to(device), target_tensor.to(device)
            log_probs = model(context_tensor)

            NN_mean_acc += get_accuracy_from_log_probs(log_probs, target_tensor)
            SP_mean_acc += eval_spellcheck(target_tensor, masked_words, device)
            SPNN_mean_acc += eval_spellcheck_with_NN(log_probs, target_tensor, masked_words, device)

            count += 1
            if it % 50 == 0: 
                print("Dev Iteration {} complete. NN_Mean Acc:{}; Time taken (s): {}".format(it, NN_mean_acc / count, (time.time()-dev_st)))
                print("Dev Iteration {} complete. SP_Mean Acc:{}; Time taken (s): {}".format(it, SP_mean_acc / count, (time.time()-dev_st)))
                print("Dev Iteration {} complete. SPNN_Mean Acc:{}; Time taken (s): {}".format(it, SPNN_mean_acc / count, (time.time()-dev_st)))
                dev_st = time.time()

    return NN_mean_acc / count, SP_mean_acc / count, SPNN_mean_acc / count

In [148]:
evaluate(best_model, test_loader, device)

Dev Iteration 0 complete. NN_Mean Acc:0.0; Time taken (s): 29.62201952934265
Dev Iteration 0 complete. SP_Mean Acc:0.0158233642578125; Time taken (s): 29.623019695281982
Dev Iteration 0 complete. SPNN_Mean Acc:5.2317671361379325e-05; Time taken (s): 29.62402057647705
Dev Iteration 50 complete. NN_Mean Acc:1.8549901142250746e-05; Time taken (s): 1495.7777783870697
Dev Iteration 50 complete. SP_Mean Acc:0.015069999732077122; Time taken (s): 1495.7797780036926
Dev Iteration 50 complete. SPNN_Mean Acc:5.231768227531575e-05; Time taken (s): 1495.7797780036926
Dev Iteration 100 complete. NN_Mean Acc:5.4387761338148266e-05; Time taken (s): 1499.3397767543793
Dev Iteration 100 complete. SP_Mean Acc:0.012779008597135544; Time taken (s): 1499.3407766819
Dev Iteration 100 complete. SPNN_Mean Acc:5.231764953350648e-05; Time taken (s): 1499.341777086258


KeyboardInterrupt: 

In [159]:
#explore some predictions

for it, data_tensor in enumerate(test_loader):
    context_tensor = data_tensor[0]
    target_tensor = data_tensor[1]
    masked_words = data_tensor[2]
    break


#see what the NN predicts
context_tensor, target_tensor = context_tensor.to(device), target_tensor.to(device)
best_model.eval()
log_probs = best_model(context_tensor)
probs = torch.exp(log_probs)
predicted_label = torch.argmax(probs, dim=1).tolist()
predicted_label = [id_to_word_mappings[l] for l in predicted_label]

In [169]:
#so its pretty clear that this model alone is dumb, and just predicts the same thing alot
context = [(id_to_word_mappings[l[0]], id_to_word_mappings[l[1]], predicted_label[i], masked_words[i], id_to_word_mappings[target_tensor.tolist()[i][0]] ) for i, l in enumerate(context_tensor.tolist())]
context

# context context prediction masked true

[('he', 'had', 'optional', '@', 'a'),
 ('had', 'a', 'champagne', '@uest', 'guest'),
 ('a', 'guest', 'champagne', '<U@K>', '<UNK>'),
 ('guest', '<UNK>', 'champagne', 'sta@ring', 'starring'),
 ('<UNK>', 'starring', 'median', 'r@le', 'role'),
 ('starring', 'role', 'metallic', '@n', 'on'),
 ('role', 'on', 'gaming', 'th@', 'the'),
 ('on', 'the', 'optional', 'televisio@', 'television'),
 ('the', 'television', 'champagne', 's@ries', 'series'),
 ('television', 'series', 'champagne', 'th@', 'the'),
 ('series', 'the', 'dual', 'bil@', 'bill'),
 ('the', 'bill', 'optional', 'i@', 'in'),
 ('bill', 'in', 'mcfadden', '<yea@>', '<year>'),
 ('this', 'was', 'optional', 'followe@', 'followed'),
 ('was', 'followed', 'champagne', 'b@', 'by'),
 ('followed', 'by', 'dual', '@', 'a'),
 ('by', 'a', 'champagne', 'st@rring', 'starring'),
 ('a', 'starring', 'dual', '@ole', 'role'),
 ('starring', 'role', 'metallic', 'i@', 'in'),
 ('role', 'in', 'gaming', 'th@', 'the'),
 ('in', 'the', 'champagne', 'p@ay', 'play'),
 (

In [172]:
#try just spellcheck

def spellcheck(masked_words):

    preds = []
    for word in masked_words:
        neighbors = find_candidates(word, vocab, word_to_id_mappings)
        
        #if there are results, just pick the closest word
        if len(neighbors):
            pred = neighbors[0]
        else: #just predict the unknown token
            pred = word_to_id_mappings["<UNK>"]
        
        preds.append(pred)
    return preds

preds = spellcheck(masked_words)

sp = [(id_to_word_mappings[l[0]], id_to_word_mappings[l[1]], id_to_word_mappings[preds[i]], masked_words[i], id_to_word_mappings[target_tensor.tolist()[i][0]] ) for i, l in enumerate(context_tensor.tolist())]
sp

[('he', 'had', 'x', '@', 'a'),
 ('had', 'a', 'guest', '@uest', 'guest'),
 ('a', 'guest', '<UNK>', '<U@K>', '<UNK>'),
 ('guest', '<UNK>', 'starring', 'sta@ring', 'starring'),
 ('<UNK>', 'starring', 'role', 'r@le', 'role'),
 ('starring', 'role', 'rn', '@n', 'on'),
 ('role', 'on', 'thx', 'th@', 'the'),
 ('on', 'the', 'television', 'televisio@', 'television'),
 ('the', 'television', 'series', 's@ries', 'series'),
 ('television', 'series', 'thx', 'th@', 'the'),
 ('series', 'the', 'bill', 'bil@', 'bill'),
 ('the', 'bill', 'is', 'i@', 'in'),
 ('bill', 'in', '<year>', '<yea@>', '<year>'),
 ('this', 'was', 'followed', 'followe@', 'followed'),
 ('was', 'followed', 'be', 'b@', 'by'),
 ('followed', 'by', 'x', '@', 'a'),
 ('by', 'a', 'stirring', 'st@rring', 'starring'),
 ('a', 'starring', 'cole', '@ole', 'role'),
 ('starring', 'role', 'is', 'i@', 'in'),
 ('role', 'in', 'thx', 'th@', 'the'),
 ('in', 'the', 'play', 'p@ay', 'play'),
 ('the', 'play', 'herons', 'heron@', 'herons'),
 ('play', 'herons', '

In [174]:
#this looks alot better, lets see how accurate it is

np.mean([1 if id_to_word_mappings[preds[i]] == id_to_word_mappings[target_tensor.tolist()[i][0]] else 0 for i, l in enumerate(context_tensor.tolist())])

0.609375

In [198]:
#now lets try combining these approaches

def spellcheck_with_NN(log_probs, masked_words):

    #NN probs
    probs = torch.exp(log_probs)
    
    # Sort indices based on probabilities, each indice corresponds to a word id
    sorted_indices = torch.argsort(probs, descending=True, axis=1).tolist()

    preds = []
    for i, word in enumerate(masked_words):
        neighbors = find_candidates(word, vocab, word_to_id_mappings)

        NN_preds = sorted_indices[i]

        #iterate through words first based on distance, and see how they match with the NN probs, the first match will be our pred

        highest_prob_index = len(NNSPpreds)
        pred = neighbors[0] #default prediction if no match
        for neighbor in neighbors:
            #only search up till previous highest probabilty match

            j = 0
            while j < highest_prob_index:
                id = NN_preds[j]
                if neighbor == id:
                    pred = id
                    highest_prob_index = j
                j += 1
  
  
        
        preds.append(pred)
           

    
    return preds

NNSPpreds = spellcheck_with_NN(log_probs, masked_words)

nnsp = [(id_to_word_mappings[l[0]], id_to_word_mappings[l[1]], id_to_word_mappings[NNSPpreds[i]], masked_words[i], id_to_word_mappings[target_tensor.tolist()[i][0]] ) for i, l in enumerate(context_tensor.tolist())]
nnsp

[('he', 'had', 'x', '@', 'a'),
 ('had', 'a', 'guest', '@uest', 'guest'),
 ('a', 'guest', '<UNK>', '<U@K>', '<UNK>'),
 ('guest', '<UNK>', 'starring', 'sta@ring', 'starring'),
 ('<UNK>', 'starring', 'role', 'r@le', 'role'),
 ('starring', 'role', 'rn', '@n', 'on'),
 ('role', 'on', 'thx', 'th@', 'the'),
 ('on', 'the', 'television', 'televisio@', 'television'),
 ('the', 'television', 'series', 's@ries', 'series'),
 ('television', 'series', 'thx', 'th@', 'the'),
 ('series', 'the', 'bill', 'bil@', 'bill'),
 ('the', 'bill', 'is', 'i@', 'in'),
 ('bill', 'in', '<year>', '<yea@>', '<year>'),
 ('this', 'was', 'followed', 'followe@', 'followed'),
 ('was', 'followed', 'be', 'b@', 'by'),
 ('followed', 'by', '–', '@', 'a'),
 ('by', 'a', 'stirring', 'st@rring', 'starring'),
 ('a', 'starring', 'cole', '@ole', 'role'),
 ('starring', 'role', 'is', 'i@', 'in'),
 ('role', 'in', 'thx', 'th@', 'the'),
 ('in', 'the', 'play', 'p@ay', 'play'),
 ('the', 'play', 'herons', 'heron@', 'herons'),
 ('play', 'herons', '

In [199]:
np.mean([1 if id_to_word_mappings[NNSPpreds[i]] == id_to_word_mappings[target_tensor.tolist()[i][0]] else 0 for i, l in enumerate(context_tensor.tolist())])

0.61328125

adding bengio only slightly improves it

note: try using something like word2vec as a supplement instead of bengio