In [1]:
import os
import math
from typing import *
from tqdm.auto import tqdm
import torch
from torch import nn, Tensor, optim
from torch.utils.data import DataLoader
import datasets
from datasets import *
from transformers import AutoTokenizer, DataCollatorWithPadding
import project_paths as pp

In [2]:
class RNNCell(nn.Module):
    '''A basic RNN cell that implements a single step of recurrent processing.
    
    The cell takes an input vector and previous hidden state, combines them through
    linear transformations and a non-linear activation, and outputs the new hidden state.
    
    Args:
        input_size (int): Size of the input vector
        hidden_size (int): Size of the hidden state vector
        
    Attributes:
        input_to_hidden (nn.Linear): Linear transformation from input to hidden state
        hidden_to_hidden (nn.Linear): Linear transformation of previous hidden state
        activation (nn.Tanh): Non-linear activation function
    '''
    def __init__(self, input_size: int, hidden_size: int) -> None:
        super(RNNCell, self).__init__()

        self.register_buffer('input_size', torch.tensor(input_size))
        self.register_buffer('hidden_size', torch.tensor(hidden_size))
        
        self.input_to_hidden = nn.Linear(input_size, hidden_size, bias=True)
        self.hidden_to_hidden = nn.Linear(hidden_size, hidden_size, bias=True)
        self.activation = nn.Tanh()

    def forward(self, x: Tensor, h: Tensor) -> Tensor:
        x = self.activation(self.input_to_hidden(x) + self.hidden_to_hidden(h))
        return x

In [3]:
class RNN(nn.Module):
    def __init__(self, vocab_size: int, embedding_dim: int, hidden_size: int, output_size: int, num_layers: int = 1) -> None:
        '''A multi-layer RNN model for sequence processing.
        
        This RNN implementation processes sequences using multiple stacked RNN cells.
        Each layer processes the output from the previous layer, with the first layer
        processing embedded input tokens and the final layer feeding into an output layer.
        
        Args:
            vocab_size (int): Size of the vocabulary (number of unique tokens)
            embedding_dim (int): Dimension of the token embeddings
            hidden_size (int): Size of the hidden state in each RNN cell
            output_size (int): Size of the output vector
            num_layers (int, optional): Number of stacked RNN layers. Defaults to 1.
            
        Attributes:
            embedding (nn.Embedding): Embedding layer that converts token IDs to vectors
            rnn_cells (nn.ModuleList): List of RNN cells, one per layer
            output_layer (nn.Linear): Final linear transformation to output size
        '''
        super(RNN, self).__init__()

        self.register_buffer('vocab_size', torch.tensor(vocab_size))
        self.register_buffer('embedding_dim', torch.tensor(embedding_dim))
        self.register_buffer('hidden_size', torch.tensor(hidden_size))
        self.register_buffer('output_size', torch.tensor(output_size))
        self.register_buffer('num_layers', torch.tensor(num_layers))

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn_cells = nn.ModuleList()
        self.rnn_cells.append(RNNCell(embedding_dim, hidden_size))
        for layer_idx in range(1, num_layers):
            self.rnn_cells.append(RNNCell(hidden_size, hidden_size))
        self.output_layer = nn.Linear(hidden_size, output_size, bias=False)

    def forward(self, x: Tensor) -> Tensor:
        batch_size, seq_len = x.size()
        x = self.embedding(x)
        
        hidden_states = [torch.zeros(size=(batch_size, self.hidden_size), device=x.device) for layer_idx in range(self.num_layers)]
        for time_step in range(seq_len):
            input_to_rnn_cell = x[:, time_step, :]
            for layer_idx in range(self.num_layers):
                hidden_states[layer_idx] = self.rnn_cells[layer_idx](input_to_rnn_cell, hidden_states[layer_idx])
                input_to_rnn_cell = hidden_states[layer_idx]

        output = self.output_layer(hidden_states[-1])
        return output

In [4]:
dataset_folder_path = os.path.join(pp.aclImdb_dataset_folder_path, 'train')
dataset = datasets.load_from_disk(dataset_folder_path)

train_and_val_datasets = dataset.train_test_split(test_size=0.3)
train_dataset = train_and_val_datasets['train']
val_dataset = train_and_val_datasets['test']

len_train_dataset = len(train_dataset)
num_pos_instances_in_train_dataset = len(train_dataset.filter(lambda item: item['label'] == 'pos'))
num_neg_instances_in_train_dataset = len_train_dataset - num_pos_instances_in_train_dataset

len_val_dataset = len(val_dataset)
num_pos_instances_in_val_dataset = len(val_dataset.filter(lambda item: item['label'] == 'pos'))
num_neg_instances_in_val_dataset = len_val_dataset - num_pos_instances_in_val_dataset

Filter:   0%|          | 0/17500 [00:00<?, ? examples/s]

Filter:   0%|          | 0/7500 [00:00<?, ? examples/s]

In [5]:
tokenizer_folder_path = os.path.join(pp.word_piece_tokenizer_folder_path, '4096')
tokenizer = AutoTokenizer.from_pretrained(tokenizer_folder_path)
tokenizer.padding_side = 'left'

In [6]:
vocab_size = tokenizer.vocab_size
embedding_dim = 512
hidden_size = 512
output_size = 1
num_layers = 1

model = RNN(vocab_size, embedding_dim, hidden_size, output_size, num_layers)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

RNN(
  (embedding): Embedding(4096, 512)
  (rnn_cells): ModuleList(
    (0): RNNCell(
      (input_to_hidden): Linear(in_features=512, out_features=512, bias=True)
      (hidden_to_hidden): Linear(in_features=512, out_features=512, bias=True)
      (activation): Tanh()
    )
  )
  (output_layer): Linear(in_features=512, out_features=1, bias=False)
)

In [7]:
learning_rate = 1e-4
train_batch_size = 64
val_batch_size = 64
num_epochs = 5
num_train_batches = math.ceil(len(train_dataset) / train_batch_size)
num_val_batches = math.ceil(len(val_dataset) / val_batch_size)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

In [8]:
label_map = {'neg': 0, 'pos': 1}
def collate_fn(batch):
    texts = [item['text'] for item in batch]
    labels = [[label_map[item['label']]] for item in batch]    
    encodings = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')
    input_ids = encodings['input_ids']
    # attention_mask = encodings['attention_mask']
    labels = torch.tensor(labels, dtype=torch.float32)

    return input_ids, labels

train_dataloader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=val_batch_size, shuffle=True, collate_fn=collate_fn)

In [9]:
progress_bar = tqdm(total=num_epochs * num_train_batches, dynamic_ncols=True)
for epoch_idx in range(num_epochs):
    model.train()
    epoch_loss = 0

    for batch_idx, batch in enumerate(train_dataloader):
        input_ids, labels = [item.to(device) for item in batch]

        optimizer.zero_grad()
        output_logits = model(input_ids)

        loss = criterion(output_logits, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

        progress_bar.set_description(f'Batch loss: {round(loss.item(), 3)}')
        progress_bar.update(1)

    avg_loss = epoch_loss / len_train_dataset
    print(f'Epoch {epoch_idx + 1} average loss: {round(avg_loss, 3)}')
progress_bar.close()

  0%|                                                                                                         …

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Epoch 1 average loss: 0.011
Epoch 2 average loss: 0.009
Epoch 3 average loss: 0.008
Epoch 4 average loss: 0.009
Epoch 5 average loss: 0.008


In [12]:
model = RNN(vocab_size, embedding_dim, hidden_size, output_size, num_layers)
model_save_file_path = os.path.join(pp.rnn_models_folder_path, '01.pth')
model.load_state_dict(torch.load(model_save_file_path))

  model.load_state_dict(torch.load(model_save_file_path))


<All keys matched successfully>

In [13]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.eval()

RNN(
  (embedding): Embedding(4096, 512)
  (rnn_cells): ModuleList(
    (0): RNNCell(
      (input_to_hidden): Linear(in_features=512, out_features=512, bias=True)
      (hidden_to_hidden): Linear(in_features=512, out_features=512, bias=True)
      (activation): Tanh()
    )
  )
  (output_layer): Linear(in_features=512, out_features=1, bias=False)
)

In [14]:
tp = fp = tn = fn = 0
progress_bar = tqdm(total=num_val_batches, dynamic_ncols=True)
for batch_idx, batch in enumerate(val_dataloader):
    input_ids, labels = [item.to(device) for item in batch]

    output_logits = model(input_ids)
    probs = nn.functional.sigmoid(output_logits)
    probs[probs >= 0.5] = 1
    probs[probs < 0.5] = 0

    tp += ((probs == 1.0) & (labels == 1.0)).sum()
    fp += ((probs == 1.0) & (labels == 0.0)).sum()
    tn += ((probs == 0.0) & (labels == 0.0)).sum()
    fn += ((probs == 0.0) & (labels == 1.0)).sum()
    progress_bar.update(1)

progress_bar.close()

accuracy = (tp + tn) / (tp + fp + tn + fn)
precision = (tp) / (tp + fp)
recall = (tp) / (tp + fn)

print(f'Accuracy: {accuracy}')
print(f'Precision: {precision}')
print(f'Recall: {recall}')

  0%|                                                                                                         …

Accuracy: 0.7550666928291321
Precision: 0.7969143390655518
Recall: 0.6798281073570251
