In [None]:
    ## imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import numpy as np

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MaskedLanguageModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=3):
        super(MaskedLanguageModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        # Embedding layer
        self.embedding = nn.Embedding(input_size, hidden_size)
        
        # Define multiple LSTM layers
        self.lstms = nn.ModuleList([nn.LSTM(hidden_size, hidden_size, batch_first=True) for _ in range(num_layers)])
        
        # Output layer
        self.fc = nn.Linear(hidden_size, input_size)
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, input):
        # Embed input
        embedded = self.embedding(input)
        
        # Forward pass through LSTM layers
        output = embedded
        for lstm in self.lstms:
            output, _ = lstm(output)
        
        # Output layer
        output = self.fc(output[:, -1, :])  # Taking the last time-step's output
        output = self.softmax(output)
        return output

# Example usage:
input_size = 100  # Example vocabulary size
hidden_size = 128  # Example hidden size
num_layers = 3  # Number of LSTM layers

model = MaskedLanguageModel(input_size, hidden_size, num_layers)
print(model)


In [None]:
class CustomDataset(Dataset):
    def __init__(self, masked_words, original_words, char_to_index):
        self.masked_words = masked_words
        self.original_words = original_words
        self.char_to_index = char_to_index

    def __len__(self):
        return len(self.masked_words)

    def __getitem__(self, idx):
        masked_word = self.masked_words[idx]
        original_word = self.original_words[idx]
        
        # Convert characters to indices
        masked_indices = [self.char_to_index[c] for c in masked_word]
        
        # Convert original word to soft encoding
        weights = [0] * len(self.char_to_index)
        total_diff = 0
        for i in range(len(original_word)):
            if original_word[i] != masked_word[i]:
                weights[self.char_to_index[original_word[i]]] += 1 
                total_diff += 1
        
        # Handle division by zero
        if total_diff == 0:
            weights = [1 / len(self.char_to_index)] * len(self.char_to_index)
        else:
            for i in range(len(weights)):
                weights[i] = weights[i] / total_diff
        
        original_indices = torch.tensor(weights)
        
        return torch.tensor(masked_indices), original_indices

In [None]:
# Collate function for DataLoader
def collate_fn(batch):
    masked_words, original_words = zip(*batch)
    max_len = max(len(word) for word in masked_words)
    padded_masked_words = torch.stack([torch.nn.functional.pad(word, (0, max_len - len(word)), value=0) for word in masked_words])
    stacked_orignal_words = torch.stack(original_words)
    # padded_original_words = torch.stack([torch.nn.functional.pad(word, (0, max_len - len(word)), value=0) for word in original_words])
    return padded_masked_words, stacked_orignal_words

In [None]:
import random

def generate_masked_words(original_words, mask_prob=0.6):
    masked_words = []
    for word in original_words:
        masked_word = ''
        first_char_unmasked = True  # Flag to keep track of the first character
        for char in word:
            if char.isalpha():
                if first_char_unmasked:
                    masked_word += char
                    first_char_unmasked = False
                elif random.random() <= mask_prob:
                    masked_word += '_'
                else:
                    masked_word += char
            else:
                masked_word += char
        masked_words.append(masked_word)
    return masked_words
  


# Example usage:
file_path = '/kaggle/input/training-words/training.txt'  # Path to your text file containing words

# Read the file and extract words
with open(file_path, 'r') as file:
    words = [line.strip() for line in file]

# Considering the first 4 words for demonstration
original_words = words

# Generate masked words
masked_words = generate_masked_words(original_words)

print("Original Words:", original_words)
print("Masked Words:", masked_words)

In [None]:
# Sample data
# masked_words = ['a__l_', 'p__', 'l__r___g', 'bott_e']
# original_words = ['apple', 'pen', 'learning', 'bottle']

## storing how words are mapped so that we can convert them back again
char_to_index = {chr(i): i - 96 for i in range(97, 123)}
char_to_index.update({'_': 27})
char_to_index.update({'-': 0})
index_to_char = {i: char for char, i in char_to_index.items()}

In [None]:
# Training function
def train(model, dataloader, criterion, optimizer, epochs):
    model.train()
    for epoch in range(epochs):
        running_loss = 0.0
        for masked_words, weights_chars in tqdm(dataloader, desc=f'Epoch {epoch + 1}/{epochs}', unit='batch'):
            optimizer.zero_grad()
            output = model(masked_words)
            loss = criterion(output, weights_chars)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch + 1}/{epochs}, Loss: {running_loss / len(dataloader)}")

In [None]:
import torch

if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU available, using GPU.")
else:
    device = torch.device("cpu")
    print("GPU not available, using CPU.")

In [None]:
### Hyperparameters
input_size = len(char_to_index)
hidden_size = 10
batch_size = 32
epochs = 2000
learning_rate = 0.001
import torch.nn.functional as F
# Initialize model, criterion, optimizer
model = MaskedLanguageModel(input_size, hidden_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Create DataLoader
dataset = CustomDataset(masked_words, original_words, char_to_index)
dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=collate_fn)

# Train the model
train(model, dataloader, criterion, optimizer, epochs)

In [None]:
model

In [None]:
# Testing function
def test(model, dataloader, char_to_index, index_to_char):
    model.eval()
    with torch.no_grad():
        for masked_words, original_words in dataloader:
            output = model(masked_words)
            print(output)
            char_ind = [np.argmax(output[i].detach().numpy()) for i in range(output.shape[0])]
            predictions = [index_to_char[index] for index in char_ind]
            print(predictions)

In [None]:
# Sample data
masked_words_test = ['c__k', 'b_t']
original_words_test = ['cook', 'bat']

# Create DataLoader
dataset = CustomDataset(masked_words_test, original_words_test, char_to_index)
dataloader = DataLoader(dataset, batch_size=batch_size, collate_fn=collate_fn)
# Test the model
test(model, dataloader, char_to_index, index_to_char)

In [None]:
torch.save(model.state_dict(), 'model.pth')