In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [4]:
# Mount files
from google.colab import drive
drive.mount('/content/drive')

# Load data
def read_data(file_name):
    with open(file_name, 'r') as f:
        return f.read().replace('\n', '<eos>').split()

# Load PTB data
path = '/content/drive/MyDrive/Colab Notebooks/data/'
train_text = read_data(path + 'ptb.train.txt')
valid_text = read_data(path + 'ptb.valid.txt')
test_text = read_data(path + 'ptb.test.txt')

print(train_text[:10])
print(valid_text[:10])
print(test_text[:10])

Mounted at /content/drive
['aer', 'banknote', 'berlitz', 'calloway', 'centrust', 'cluett', 'fromstein', 'gitano', 'guterman', 'hydro-quebec']
['consumers', 'may', 'want', 'to', 'move', 'their', 'telephones', 'a', 'little', 'closer']
['no', 'it', 'was', "n't", 'black', 'monday', '<eos>', 'but', 'while', 'the']


In [5]:
# Build vocabulary
def build_vocab(data):
    vocab = {word: idx for idx, word in enumerate(set(data))}
    return vocab

vocab = build_vocab(train_text)
vocab_size = len(vocab)
print(f"Vocabulary size: {vocab_size}")
print(vocab.keys())


# Tokenize and pad sequences
def tokenize_and_pad(text, vocab, sequence_length):
    tokenized = [vocab[word] if word in vocab else vocab['<unk>'] for word in text]
    sequences = [tokenized[i:i+sequence_length] for i in range(0, len(tokenized), sequence_length)]
    padded_sequences = [seq + [vocab['<pad>']] * (sequence_length - len(seq)) if len(seq) < sequence_length else seq for seq in sequences]
    return torch.tensor(padded_sequences)

# Example vocabulary (you should build this from your data)
vocab = {'<unk>': 0, '<pad>': 1}  # Add more words to the vocabulary

# Define sequence length
sequence_length = 5

# Tokenize and pad the data
train_data = tokenize_and_pad(train_text, vocab, sequence_length)
valid_data = tokenize_and_pad(valid_text, vocab, sequence_length)
test_data = tokenize_and_pad(test_text, vocab, sequence_length)

# # Function to convert text to tensor of indices
# def text_to_tensor(text, vocab):
#     return torch.tensor([vocab[word] for word in text if word in vocab], dtype=torch.long)

# train_data = text_to_tensor(train_text, vocab)
# valid_data = text_to_tensor(valid_text, vocab)
# test_data = text_to_tensor(test_text, vocab)

train_data.shape, valid_data.shape, test_data.shape

Vocabulary size: 10000


(torch.Size([929589]), torch.Size([73760]), torch.Size([82430]))

# Defining our models

In [15]:
# Model Definitions
class RNNModel(nn.Module):
    def __init__(self, rnn_type, vocab_size, embed_size, hidden_size, num_layers, dropout=0.0):
        super().__init__()
        self.num_layers = num_layers
        # embedding layer that maps each word (represented by an index) to a dense vector of fixed size(embed_size)
        # input size - [batch_size, sequence_length] and convert it to [batch_size, sequence_length, embed_size]
        self.embedding = nn.Embedding(vocab_size, embed_size)
        if rnn_type == 'LSTM':
          # batch_first=True -> input shape to be [batch_size, sequence_length, embed_size] instead of [sequence_length, batch_size, embed_size]
            self.rnn = nn.LSTM(embed_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        elif rnn_type == 'GRU':
            self.rnn = nn.GRU(embed_size, hidden_size, num_layers, dropout=dropout, batch_first=True)
        # convert the hidden state outputs of RNN into predictions for the next word
        self.fc = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, h):
        """
        x: [batch_size, sequence_length], input tensor(batch of tokenized sentences)
        h: [num_layers, batch_size, hidden_size], hidden state (and cell state)
        """
        x = self.embedding(x)
        # print(x.shape, type(x))
        x = self.dropout(x)
        out, h = self.rnn(x, h)
        out = self.fc(out)
        return out, h
    
    def init_hidden(self, batch_size):
      # Initialize hidden state (and cell state for LSTM)
      if isinstance(self.rnn, nn.LSTM):
          return (torch.zeros(self.num_layers, batch_size, self.rnn.hidden_size).to(device),
                  torch.zeros(self.num_layers, batch_size, self.rnn.hidden_size).to(device))
      else:
          return torch.zeros(self.num_layers, batch_size, self.rnn.hidden_size).to(device)



# def train_model(model, data, optimizer, criterion, batch_size):
#     model.train()
#     total_loss = 0
#     hidden = None
#     for i in range(0, len(data) - 1, batch_size):
#         inputs = data[i:i+batch_size]
#         targets = data[i+1:i+1+batch_size]
#         model.zero_grad()

#         # Initialize hidden state if it's None
#         if hidden is None:
#             hidden = model.rnn.init_hidden(batch_size)  # Ensure you define this method in your model

#         output, hidden = model(inputs.to(device), hidden.to(device))
#         loss = criterion(output.view(-1, vocab_size), targets.view(-1))
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()

#     return total_loss / len(data)


# # Evaluate function
# def evaluate_model(model, data, criterion, batch_size):
#     model.eval()
#     total_loss = 0
#     hidden = None
#     with torch.no_grad():
#         for i in range(0, len(data) - 1, batch_size):
#             inputs = data[i:i+batch_size]
#             targets = data[i+1:i+1+batch_size]
#             output, hidden = model(inputs.to(device), hidden.to(device))
#             loss = criterion(output.view(-1, vocab_size), targets.view(-1))
#             total_loss += loss.item()
#     return total_loss / len(data)

  
# Training function
def train_model(model, data, optimizer, criterion, batch_size):
    model.train()
    total_loss = 0
    hidden = model.init_hidden(batch_size)
    for i in range(0, len(data) - batch_size, batch_size):
        inputs = data[i:i+batch_size]
        targets = data[i+1:i+1+batch_size]
        model.zero_grad()
        inputs = inputs.to(device)
        if isinstance(hidden, tuple):  # For LSTM
            hidden = (hidden[0].to(device), hidden[1].to(device))
        else:  # For GRU
            hidden = hidden.to(device)

        output, hidden = model(inputs, hidden)

        # Detach hidden state after the forward pass
        if isinstance(hidden, tuple):  # For LSTM
            hidden = (hidden[0].detach(), hidden[1].detach())
        else:  # For GRU
            hidden = hidden.detach()
        loss = criterion(output.view(-1, vocab_size), targets.view(-1).to(device))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(data)

# Evaluation function
def evaluate_model(model, data, criterion, batch_size):
    model.eval()
    total_loss = 0
    hidden = model.init_hidden(batch_size)
    with torch.no_grad():
        for i in range(0, len(data) - batch_size, batch_size):
            inputs = data[i:i+batch_size]
            targets = data[i+1:i+1+batch_size]
            inputs = inputs.to(device)
            if isinstance(hidden, tuple):  # For LSTM
                hidden = (hidden[0].to(device), hidden[1].to(device))
            else:  # For GRU
                hidden = hidden.to(device)

            output, hidden = model(inputs, hidden)

            # Detach hidden state after the forward pass
            if isinstance(hidden, tuple):  # For LSTM
                hidden = (hidden[0].detach(), hidden[1].detach())
            else:  # For GRU
                hidden = hidden.detach()
            loss = criterion(output.view(-1, vocab_size), targets.view(-1).to(device))
            total_loss += loss.item()
    return total_loss / len(data)


# Training

In [16]:
batch_size = 32
embed_size = 200
hidden_size = 256
num_layers = 2
dropout_probs = [0.0]
learning_rates = [0.001]
num_epochs = 15

# Training loop with different hyperparameters
def run_experiments():
    criterion = nn.CrossEntropyLoss(reduction='sum')

    results = []

    for rnn_type in ['LSTM', 'GRU']:
        for dropout in dropout_probs:
            for lr in learning_rates:
                # Initialize model, optimizer
                model = RNNModel(rnn_type, vocab_size, embed_size, hidden_size, num_layers, dropout).to(device)
                optimizer = optim.SGD(model.parameters(), lr=lr)
                
                # Logging
                train_losses = []
                valid_losses = []
                
                for epoch in range(num_epochs):
                    train_loss = train_model(model, train_data, optimizer, criterion, batch_size)
                    valid_loss = evaluate_model(model, valid_data, criterion, batch_size)
                    
                    train_losses.append(train_loss)
                    valid_losses.append(valid_loss)
                    
                    print(f"Epoch {epoch+1}/{num_epochs} | RNN: {rnn_type} | Dropout: {dropout} | LR: {lr} | Train Loss: {train_loss:.4f} | Valid Loss: {valid_loss:.4f}")

                test_loss = evaluate_model(model, test_data, criterion, batch_size)
                perplexity = np.exp(test_loss)

                results.append({
                    'rnn_type': rnn_type,
                    'dropout': dropout,
                    'learning_rate': lr,
                    'test_perplexity': perplexity
                })
                # Plotting Train and Valid Perplexity
                plot_perplexity(train_losses, valid_losses, f'{rnn_type} - Dropout: {dropout}, LR: {lr}')

    # Summarize results in table
    print("\nSummary of Results:")
    for result in results:
        print(f"RNN Type: {result['rnn_type']}, Dropout: {result['dropout']}, LR: {result['learning_rate']}, Test Perplexity: {result['test_perplexity']:.2f}")

# Plotting function for Perplexity
def plot_perplexity(train_losses, valid_losses, title):
    plt.plot(np.exp(train_losses), label='Train Perplexity')
    plt.plot(np.exp(valid_losses), label='Valid Perplexity')
    plt.title(title)
    plt.xlabel('Epochs')
    plt.ylabel('Perplexity')
    plt.legend()
    plt.show()

# Run experiments
run_experiments()

RuntimeError: For unbatched 2-D input, hx and cx should also be 2-D but got (3-D, 3-D) tensors

# Plotting + Table

In [None]:
# epochs = range(1, 16)

# def plot_accuracy(epochs, train_accuracies, test_accuracies, title, color):
#     plt.plot(epochs, train_accuracies, color=color, label='Train Accuracy')
#     plt.plot(epochs, test_accuracies, color=color, linestyle='--', label='Test Accuracy')
#     plt.title(title)
#     plt.xlabel('Epochs')
#     plt.ylabel('Accuracy')
#     plt.legend()

# # Plotting convergence graphs
# plt.figure(figsize=(12, 8))

# plt.subplot(2, 2, 1)
# plt.axhline(y=88, color='black', linestyle='-', label='88% Threshold')
# model = result['model_no_reg']
# plot_accuracy(epochs, model['train_accuracies'], model['test_accuracies'], model['name'], color='r')

# plt.subplot(2, 2, 2)
# plt.axhline(y=88, color='black', linestyle='-', label='88% Threshold')
# model = result['model_with_dropout']
# plot_accuracy(epochs, model['train_accuracies'], model['test_accuracies'], model['name'], color='g')

# plt.subplot(2, 2, 3)
# plt.axhline(y=88, color='black', linestyle='-', label='88% Threshold')
# model = result['model_with_batchnorm']
# plot_accuracy(epochs, model['train_accuracies'], model['test_accuracies'], model['name'], color='b')

# plt.subplot(2, 2, 4)
# plt.axhline(y=88, color='black', linestyle='-', label='88% Threshold')
# model = result['model_with_weight_decay']
# plot_accuracy(epochs, model['train_accuracies'], model['test_accuracies'], model['name'], color='y')

# plt.tight_layout()
# plt.show()

# Testing