# LSTM Training on Penn Treebank Dataset
This notebook demonstrates how to train an LSTM model for language modeling using the Penn Treebank dataset in PyTorch.

In [None]:
import torch
import torch.nn as nn
import os
from collections import Counter

def read_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return f.read().strip().split('\n')

def tokenize(text):
    return text.lower().split()

# Read the data
train_path = 'data/ptb.train.txt'
valid_path = 'data/ptb.valid.txt'

train_data = read_data(train_path)
valid_data = read_data(valid_path)

# Build vocabulary
def build_vocab(text_data, min_freq=2):
    counter = Counter()
    for line in text_data:
        counter.update(tokenize(line))
    
    # Filter words by frequency and add special tokens
    words = ['<unk>', '<pad>', '<bos>', '<eos>'] + [word for word, count in counter.items() if count >= min_freq]
    word_to_idx = {word: idx for idx, word in enumerate(words)}
    idx_to_word = {idx: word for word, idx in word_to_idx.items()}
    return word_to_idx, idx_to_word

word_to_idx, idx_to_word = build_vocab(train_data)
vocab_size = len(word_to_idx)
print(f'Vocabulary size: {vocab_size}')



OSError: /home/hvaidya/miniconda3/envs/if/lib/python3.12/site-packages/torchtext/lib/libtorchtext.so: undefined symbol: _ZN5torch6detail10class_baseC2ERKSsS3_SsRKSt9type_infoS6_

In [None]:
# Process data into tensors
def process_data(data, word_to_idx):
    processed = []
    for line in data:
        tokens = tokenize(line)
        # Convert tokens to indices, replacing unknown words with <unk>
        indices = [word_to_idx.get(token, word_to_idx['<unk>']) for token in tokens]
        # Add <bos> and <eos> tokens
        indices = [word_to_idx['<bos>']] + indices + [word_to_idx['<eos>']]
        processed.append(torch.tensor(indices))
    return processed

train_tensors = process_data(train_data, word_to_idx)
valid_tensors = process_data(valid_data, word_to_idx)

TypeError: LanguageModelingDataset.__init__() missing 2 required positional arguments: 'path' and 'text_field'

In [None]:
# Create batches
def create_batches(data_tensors, batch_size):
    # Sort by length for efficient batching
    data_tensors.sort(key=lambda x: len(x), reverse=True)
    batches = []
    
    for i in range(0, len(data_tensors), batch_size):
        batch = data_tensors[i:i + batch_size]
        # Pad sequences in the batch to the same length
        max_len = len(batch[0])
        padded = [torch.cat([seq, torch.tensor([word_to_idx['<pad>']] * (max_len - len(seq)))]) if len(seq) < max_len else seq for seq in batch]
        # Stack into a single tensor
        batches.append(torch.stack(padded))
    return batches

batch_size = 20
train_batches = create_batches(train_tensors, batch_size)
valid_batches = create_batches(valid_tensors, batch_size)

print(f'Number of training batches: {len(train_batches)}')
print(f'Number of validation batches: {len(valid_batches)}')

In [None]:
# Define LSTM Model
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_layers, dropout=0.5):
        super(LSTMModel, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, 
                           batch_first=True, dropout=dropout if num_layers > 1 else 0)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, vocab_size)
        self.hidden_size = hidden_size
        self.num_layers = num_layers
    
    def forward(self, x, hidden=None):
        # x shape: (batch_size, seq_length)
        batch_size = x.size(0)
        if hidden is None:
            h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
            c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(x.device)
            hidden = (h0, c0)
        
        embeds = self.dropout(self.embed(x))  # (batch_size, seq_length, embed_size)
        output, hidden = self.lstm(embeds, hidden)
        output = self.dropout(output)
        logits = self.fc(output)
        return logits, hidden

# Initialize model
embed_size = 200
hidden_size = 200
num_layers = 2
model = LSTMModel(vocab_size, embed_size, hidden_size, num_layers)
criterion = nn.CrossEntropyLoss(ignore_index=word_to_idx['<pad>'])
optimizer = torch.optim.Adam(model.parameters())

print(model)

In [None]:
# Training function
def train_epoch(model, train_batches, criterion, optimizer):
    model.train()
    total_loss = 0
    for batch in train_batches:
        optimizer.zero_grad()
        # Input is all tokens except last, target is all tokens except first
        inputs = batch[:, :-1]
        targets = batch[:, 1:]
        outputs, _ = model(inputs)
        # Reshape for cross entropy
        loss = criterion(outputs.reshape(-1, vocab_size), targets.reshape(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_batches)

# Generate text using the trained model
def generate_text(model, start_words=['the'], max_length=50):
    model.eval()
    with torch.no_grad():
        # Convert start words to indices
        current_ids = [word_to_idx.get(w, word_to_idx['<unk>']) for w in start_words]
        current_ids = torch.tensor([current_ids])
        hidden = None
        generated_words = start_words.copy()
        
        for _ in range(max_length):
            output, hidden = model(current_ids, hidden)
            # Get the most likely next word
            next_word_idx = output[0, -1].argmax().item()
            if idx_to_word[next_word_idx] == '<eos>':
                break
            generated_words.append(idx_to_word[next_word_idx])
            current_ids = torch.tensor([[next_word_idx]])
        
        return ' '.join(generated_words)

# Train the model
num_epochs = 10
for epoch in range(num_epochs):
    loss = train_epoch(model, train_batches, criterion, optimizer)
    print(f'Epoch {epoch+1}, Loss: {loss:.4f}')
    if (epoch + 1) % 2 == 0:
        print('\nGenerated text:')
        print(generate_text(model, start_words=['the']))