In [1]:
#Necessary ml and ds package imports 
import torch
import torch.nn.functional as F #for the activation function 
from torch.utils.data import TensorDataset, DataLoader

#Importing wordNet for Vocabulary training 
import nltk
from nltk.corpus import wordnet
nltk.download('wordnet')
nltk.download('omw-1.4')
import random

random.seed(7) # for reproducability

[nltk_data] Downloading package wordnet to /Users/brijhoward-
[nltk_data]     sarin/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /Users/brijhoward-
[nltk_data]     sarin/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
  from .autonotebook import tqdm as notebook_tqdm
2024-02-11 20:45:34,578	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [2]:
#Tokenizer for definitions
from torchtext.transforms import BERTTokenizer
VOCAB_FILE = "https://huggingface.co/bert-base-uncased/resolve/main/vocab.txt"
tokenizer = BERTTokenizer(vocab_path=VOCAB_FILE, do_lower_case=True, return_tokens=True)

100%|██████████| 232k/232k [00:00<00:00, 1.23MB/s]


In [3]:
#Word masking function from string str
def maskWord(string): 

    #Set type to string (wordnet includes integers)
    if type(string) != str: 
        string = str(string)

    str_len = len(string)
    if str_len == 0: 
        return 
    ##random.setstate() <- helpful for reproducability later on 
    if str_len < 3: 
        choices = random.choices(range(str_len), k = 1)
    else: 
        choices = random.choices(range(str_len), k = random.randint(2,str_len - 1))

    ##Unpack string into a list of characters
    str_list = [*string]
    for index in choices: 
        str_list[index] = '_'
    newstr = ' '.join(str_list)
    return newstr

#maskWord('example')

In [4]:
#Helper functions with wordNet to get definitions

def getAllDefs(word): 
    defList = list()
    for i in range(len(wordnet.synsets(word))): 
        defList.append(wordnet.synsets(word)[i].definition())
    return defList

def getFirstDef(word): 
    return wordnet.synsets(word)[0].definition()
    



def getVocab(wordList): 
    vocab = [word for word in wordList]
    print(vocab)
    all_definitions = list() 
    for word in wordList: 
        all_definitions.append(getAllDefs(word))
    
    for definition in all_definitions:
        for eachDef in tokenizer(definition): 
            for word in eachDef: 
                vocab.append(word)
    vocab = set(vocab)
    return vocab 

In [5]:
#Separates all words in wordnet into training_set of size 20,000 and validation set of size 250 
all_words = set(word for synset in wordnet.all_synsets() for word in synset.lemma_names())
all_list = [word for word in all_words] #148730 total words 

#Randomly select 20,000 words to train on 


def sampleAndRemove(stringList, size): 
    sample = []
    sample_indices = (random.sample(range(0,len(stringList)), size))
    sample_dict = {}
    remaining_words = []

    # Uses dictionary of indices to remove to reduce compute time to large O(n) 
    for i in range(len(stringList)): 
        sample_dict[i] = 0
    for index in sample_indices: 
        sample_dict[index] = 1

    for index in range(len(stringList)): 
        if sample_dict[index] == 1: 
            sample.append(stringList[index])
        else: 
            remaining_words.append(stringList[index])
    return sample, remaining_words

#See if this new return syntax actually works. if not, go back to old list of two lists format 
trainset, remaining_words  =  sampleAndRemove(all_list, 20000)
print(len(trainset))
print(len(remaining_words))
validation_set, remaining_words = sampleAndRemove(remaining_words, 500)

20000
128730


In [6]:
def createVocab(wordset): 
    #Define primary lists based off words given 
    masked_words = [ maskWord(word) for word in wordset]
    print(masked_words[:2])
    definitions = [getFirstDef(word) for word in wordset]
    tokenized_defs = [tokenizer(phrase) for phrase in definitions]
    
    #Build out allwords, which will serve as full vocab 
    allwords = [word for word in wordset]
    for word in masked_words: 
        allwords.append(word)

    for definition in tokenized_defs: 
        for word in definition: 
            allwords.append(word)

    total_vocab = set(allwords)

    print(len(total_vocab))

    word2index = dict()
    index2word = dict()
    i = 0
    for word in total_vocab: 
        word2index[word] = i
        index2word[i] = word 
        i += 1
    return total_vocab, word2index, index2word, masked_words

total_vocab, word2index, index2word, all_masked = createVocab(all_list)
vocab_size = len(total_vocab)

['d i _ _ _ m i t _', '_ u _ _ p _ n']
302564


In [7]:

#Function to create dataLoader 
#Returns the dataLoader, word2index and index2word dictionaries for embedding, and vocab_size for model init 
    
def createLoader(wordset, batch_size, all_masked):

    #Build out smaller lists for specified set 
    definitions = [getFirstDef(word) for word in wordset]
    tokenized_defs = [tokenizer(phrase) for phrase in definitions]

    labels_indices = [word2index[word] for word in wordset]
    
    masked_indices = [word2index[all_masked[i]] for i in range(len(wordset))]

    definitions_indices = []
    for definition in tokenized_defs: 
        templist = []
        for word in definition: 
            templist.append(word2index[word])
        definitions_indices.append(templist)

    #Introduce padding here 
    #Determine the longest definitional sequence
    maxDef = 0 
    for sequence in definitions_indices: 
        if len(sequence) > maxDef: 
            maxDef = len(sequence)

    #Pad with 0's
    for sequence in definitions_indices: 
        while maxDef - len(sequence) > 0: 
            sequence.append(0)
    # Convert data to PyTorch tensors
    labels_tensor = torch.tensor(labels_indices, dtype=torch.long)
    masked_tensor = torch.tensor(masked_indices, dtype=torch.long)
    definitions_tensor = torch.tensor(definitions_indices, dtype=torch.long)

    dataset = TensorDataset(definitions_tensor, masked_tensor, labels_tensor)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    return loader


In [8]:
#Model Architecture
from transformers import BertModel, BertTokenizer


class predictMasked(torch.nn.Module):
    def __init__(self, lstm_dim, masked_vocab_size, embed_dim):
        super(predictMasked, self).__init__()
        
        # BERT model for processing the definition
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        
        # An embedding layer for the masked word
        self.masked_word_embedding = torch.nn.Embedding(masked_vocab_size, embed_dim)
        
        # LSTM layer
        self.lstm = torch.nn.LSTM(self.bert.config.hidden_size + embed_dim, lstm_dim, batch_first=True, bidirectional=True)

        #Hidden layer for processing concatenation? 
        
        # Classification layer
        self.fc = torch.nn.Linear(2 * lstm_dim, vocab_size)

    def forward(self, input_ids, attention_mask, masked_word_ids):
        # Process the definition through BERT and then lstm to adjust dimension 
        bert_output = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        definition_embedding = bert_output.last_hidden_state
        
        # Embed the masked word
        masked_word_embedding = self.masked_word_embedding(masked_word_ids)
        
        # Combine the embeddings (simple concatenation here)
        combined_embedding = torch.cat((definition_embedding, masked_word_embedding), dim=1)
        
        # Process the combined embedding through LSTM
        lstm_output, _ = self.lstm(combined_embedding)
        
        # Use the output for prediction
        x = self.fc(lstm_output)  
        
        return x


#Instantiate dataLoader of training set - not sure if this creation is needed anymore
batch_size = 2000
testLoader = createLoader(trainset, batch_size, all_masked)
validationLoader = createLoader(validation_set, 1, all_masked)


In [9]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
definitions = [getFirstDef(word) for word in trainset]
masked_words = [maskWord(word) for word in trainset]
inputs = tokenizer(definitions, padding=True, truncation=True, return_tensors="pt")
BERT_len = len(inputs['input_ids'][1])
targets = tokenizer(trainset, padding = 'max_length', truncation = True, return_tensors = "pt", max_length = BERT_len)

masked_vocab_size  = len(masked_words)
masked_word2index = {}
index2masked_word = {}
for i in range(masked_vocab_size): 
    masked_word2index[masked_words[i]] = i
    index2masked_word[i] = masked_words[i]

masked_indices = [[masked_word2index[masked_words[i]]] for i in range(masked_vocab_size)]

#Pad indexes to match BERT tokenization 

for masked_index in masked_indices: 
    while BERT_len - len(masked_index) > 0: 
        masked_index.append(0)

masked_tensor = torch.tensor(masked_indices)


In [10]:
batch_size = 32
dataset = (inputs['input_ids'], inputs['attention_mask'], masked_tensor, targets['input_ids'])
loader = DataLoader(dataset, batch_size = batch_size, shuffle = True)


In [11]:
#Validation loader setup 

val_definitions = [getFirstDef(word) for word in validation_set]
val_masked_words = [maskWord(word) for word in validation_set]
val_inputs = tokenizer(val_definitions, padding=True, truncation=True, return_tensors="pt")
val_BERT_len = len(val_inputs['input_ids'][1])
val_targets = tokenizer(validation_set, padding = 'max_length', truncation = True, return_tensors = "pt", max_length = val_BERT_len)


#Do I need to make these together with the validation set and have one large set like when I did the LSTM myself? 
val_masked_vocab_size  = len(val_masked_words)
val_masked_word2index = {}
val_index2masked_word = {}
for i in range(val_masked_vocab_size): 
    val_masked_word2index[masked_words[i]] = i
    val_index2masked_word[i] = masked_words[i]

val_masked_indices = [[val_masked_word2index[masked_words[i]]] for i in range(val_masked_vocab_size)]

#Pad indexes to match BERT tokenization 
for masked_index in val_masked_indices: 
    while val_BERT_len - len(masked_index) > 0: 
        masked_index.append(0)

val_masked_tensor = torch.tensor(val_masked_indices)

In [12]:
val_dataset = (val_inputs['input_ids'], val_inputs['attention_mask'], val_masked_tensor, val_targets['input_ids'])
validationLoader = DataLoader(val_dataset, batch_size = batch_size, shuffle = True)

In [13]:
#Hyperparameters, model and loss function init
embed_dim = 50
lstm_dim = 64

model = predictMasked(lstm_dim, masked_vocab_size, embed_dim)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay = 0.00005)

#Training loop, make sure to update epochs to be 10 eventually and fine-tune 
epochs = 1
for epoch in range(epochs):
    total_loss = 0.0
    model.train()
    for input_ids, attention_mask, masked_index, target_id in loader:
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask, masked_index)
        loss = criterion(outputs, target_id)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    
    average_loss = total_loss / batch_size
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {average_loss:.4f}')
    
    model.eval()
    total_val_loss = 0.0
    with torch.no_grad():
        for val_defs, val_masked, val_labels in validationLoader:
            val_outputs = model(val_masked, val_defs)
            val_loss = criterion(val_outputs, val_labels)
            total_val_loss += val_loss.item()
    
    average_val_loss = total_val_loss / len(validationLoader)
    print(f'Validation Loss: {average_val_loss:.4f}')


: 

In [None]:
# Optimize hyperparamters here; how to tune s