In [19]:
import torch.nn as nn
import torch
import numpy as np

import torch.nn.functional as F
import re
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim

from tqdm import tqdm

from datasets import load_dataset

import gensim

In [23]:
train_data = load_dataset("Salesforce/wikitext", "wikitext-2-raw-v1")

train = train_data['train']['text']
valid = train_data['validation']['text']
test = train_data['test']['text']


# Step 2: Tokenize the text using regular expressions
def tokenize_with_re(data):
    tokenized_sentences = [
        re.findall(r'\b[a-zA-Z]+\b', sentence.lower()) for sentence in data if sentence.strip()
    ]
    return tokenized_sentences

# Tokenize each dataset split
train_tokenized = tokenize_with_re(train)
valid_tokenized = tokenize_with_re(valid)
test_tokenized = tokenize_with_re(test)

flat_train = [item for sublist in train_tokenized for item in sublist]
flat_valid = [item for sublist in valid_tokenized for item in sublist]
flat_test = [item for sublist in test_tokenized for item in sublist]

flat_total = flat_train + flat_valid + flat_test
words_available = sorted(set(flat_total))
vocab_size = len(words_available)

stoi = {word: i for i, word in enumerate(words_available)}
itos = {i: word for i, word in enumerate(words_available)}

print(f"Total number of words: {len(flat_total)}, vocabulary size: {len(words_available)}")

Total number of words: 2053065, vocabulary size: 66929


In [97]:
def create_sequences(data, seq_length, stoi):
    sequences = []
    targets = []
    
    for i in tqdm(range(len(data) - seq_length)):
        # Extract sequence and target
        seq = data[i:i+seq_length]
        target = data[i+1:i+seq_length+1]
        
        # Convert each word in the sequence and target to indices using stoi
        seq_indices = [stoi.get(word, 0) for word in seq]
        target_indices = [stoi.get(word, 0) for word in target]
        
        # Only add sequences and targets of the desired length
        if len(seq_indices) == seq_length and len(target_indices) == seq_length:
            sequences.append(seq_indices)
            targets.append(target_indices)
    
    return sequences, targets


In [101]:
word2vec = gensim.models.Word2Vec.load('wikitext_small_word2vec.model')
stoi = {word: idx for idx, word in enumerate(word2vec.wv.index_to_key)}
itos = {idx: word for idx, word in enumerate(word2vec.wv.index_to_key)}

# Example usage:
seq_length = 50
sequences, targets = create_sequences(flat_train, seq_length, stoi)


class Word2VecDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets

    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.long), torch.tensor(self.targets[idx], dtype=torch.long)

batch_size = 64
dataset = Word2VecDataset(sequences, targets)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


[95, 0, 1381, 841, 42069, 14120, 0, 59, 116, 334, 3, 2481, 55, 5, 157, 1431, 1, 0, 120, 1129, 7, 4182, 3896, 293, 54, 17, 2643, 0, 1188, 519, 1, 0, 90, 17, 33, 3689, 1631, 13986, 80, 8, 366, 0, 59, 51, 41530, 11, 90, 17438, 263, 3647]


In [90]:
class LanguageModelOneHot(nn.Module):
    def __init__(self, vocab_size, hidden_dim, num_layers, rnn_type='LSTM'):
        super(LanguageModelOneHot, self).__init__()
        self.vocab_size = vocab_size
        if rnn_type == 'LSTM':
            self.rnn = nn.LSTM(vocab_size, hidden_dim, num_layers, batch_first=True) # [batch_size, seq_length, vocab_size] -
        elif rnn_type == 'GRU':
            self.rnn = nn.GRU(vocab_size, hidden_dim, num_layers, batch_first=True)
        else:
            raise ValueError("Invalid RNN type. Choose 'LSTM' or 'GRU'.")
        
        self.fc = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, x, hidden):
        embeds =  F.one_hot(x, num_classes=self.vocab_size).float()
        out, hidden = self.rnn(embeds, hidden)
        out = out.reshape(-1, out.size(2))
        out = self.fc(out)
        return out, hidden
    

class LanguageModelWord2Vec(nn.Module):
    def __init__(self, vocab_size, hidden_dim, num_layers, rnn_type='LSTM',  word2vec_path='wikitext_small_word2vec.model'):
        super(LanguageModelWord2Vec, self).__init__()
        self.hidden_dim = hidden_dim
        
        # Step 1: Load the Gensim Word2Vec model
        self.word2vec = gensim.models.Word2Vec.load(word2vec_path)
        
        # Get the embedding dimensions from the Gensim model
        embedding_dim = self.word2vec.wv.vector_size

        # Step 2: Initialize the embedding layer with pretrained Word2Vec weights
        
        weights = torch.FloatTensor(self.word2vec.wv.vectors)
        self.embedding = nn.Embedding.from_pretrained(weights)
        
        # Step 3: Initialize RNN (LSTM or GRU)
        if rnn_type == 'LSTM':
            self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True)
        elif rnn_type == 'GRU':
            self.rnn = nn.GRU(embedding_dim, hidden_dim, num_layers, batch_first=True)
        else:
            raise ValueError("Invalid RNN type. Choose 'LSTM' or 'GRU'.")
        
        self.fc = nn.Linear(hidden_dim, vocab_size)

    
    def forward(self, x, hidden):
        # x shape: (batch_size, seq_length)
        embeds = self.embedding(x)
        # embeds shape: (batch_size, seq_length, embedding_dim)
        out, hidden = self.rnn(embeds, hidden)
        # out shape: (batch_size, seq_length, hidden_dim)
        out = out.reshape(-1, self.hidden_dim)
        # out shape: (batch_size * seq_length, hidden_dim)
        out = self.fc(out)
        # out shape: (batch_size * seq_length, vocab_size)
        return out, hidden
    
model = LanguageModelWord2Vec(vocab_size, 128, 2, rnn_type='LSTM', word2vec_path='wikitext_small_word2vec.model')

In [None]:
# Initialize the model
hidden_dim = 256  

num_layers = 3
rnn_type = 'LSTM' # Choose 'LSTM' or 'GRU'

# model = LanguageModelOneHot(vocab_size, hidden_dim, num_layers, rnn_type)
model = LanguageModelWord2Vec(vocab_size, hidden_dim,  num_layers, rnn_type)


# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0003)


# Move model to device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'mps')

model.to(device)

# Training loop
num_epochs = 3  # Adjust as needed
for epoch in range(num_epochs):
    
    model.train()
    hidden = None
    total_loss = 0
    
    for inputs, targets in tqdm(dataloader):
        hidden = None
        inputs, targets = inputs.to(device), targets.to(device)
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs, hidden = model(inputs, hidden)
        
        
        # Detach hidden states to prevent backpropagating through the entire training history
        if isinstance(hidden, tuple):
            hidden = tuple([h.detach() for h in hidden])
        else:
            hidden = hidden.detach()
        
        outputs = outputs.view(-1, vocab_size) 
        targets = targets.view(-1)     
        
        # Compute loss
        loss = criterion(outputs, targets)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    avg_loss = total_loss / len(dataloader)
    torch.save(model.state_dict(), f'./model_{epoch}.pt')

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')


In [6]:
def generate_text(model, seed_word, max_length=50, random_sampling=False):
    model.eval()  # Set the model to evaluation mode
    generated_words = [seed_word]  # List to store generated words
    
    # Convert seed word to index
    seed_idx = torch.tensor([[stoi[seed_word]]]).to(device)  # Shape: (1, 1)

    # Initialize hidden state
    hidden = None
    
    # Loop through to generate words
    for _ in range(max_length):
        # Forward pass through the model
        with torch.no_grad():
            output, hidden = model(seed_idx, hidden)
        
        # Get the predicted word (highest probability or sample)
        output = output.squeeze(1)  # Remove the seq_len dimension (now (1, vocab_size))
        if random_sampling:
            # Sample from the output distribution
            probabilities = torch.softmax(output, dim=1)
            predicted_idx = torch.multinomial(probabilities, num_samples=1).item()
        else:
            predicted_idx = torch.argmax(output, dim=1).item()  # Get the index of the word with highest probability
        
        # Convert index back to word
        predicted_word = itos[predicted_idx]
        
        # Append the predicted word to the list
        generated_words.append(predicted_word)
        
        # Set the predicted word as the next input (shape: (1, 1))
        seed_idx = torch.tensor([[predicted_idx]]).to(device)
        
        # Stop if an end-of-sequence token is generated (optional)
        if predicted_word == "<eos>":  # Assuming "<eos>" is the token for end of sentence
            break
    
    return ' '.join(generated_words)

# Example usage
generated_text = generate_text(model, seed_word="Harry", max_length=50, random_sampling=True)
print(generated_text)


Harry , and - another Thousand inside the dormitory . He was looking very around by a splash , and Lee did not showing him Crookshanks and went about to know I had been in a former relief . “ The skrewts with Household … Had I want to have an


In [None]:
fasttext = gensim.models.FastText.load('wikitext_small_fasttext.model')
stoi = {word: idx for idx, word in enumerate(fasttext.wv.index_to_key)}
itos = {idx: word for idx, word in enumerate(fasttext.wv.index_to_key)}

# Example usage:
seq_length = 50
sequences, targets = create_sequences(flat_train, seq_length, stoi)


class Word2VecDataset(Dataset):
    def __init__(self, sequences, targets):
        self.sequences = sequences
        self.targets = targets

    def __len__(self):
        return len(self.sequences)
    
    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.long), torch.tensor(self.targets[idx], dtype=torch.long)

batch_size = 64
dataset = Word2VecDataset(sequences, targets)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
