<a href="https://colab.research.google.com/github/a-mhamdi/nlp/blob/main/Jupyter/02_sequence_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sequence Processing Foundations
---



## Outlines
1. [Dataset loading and processing](#preprocess)
1. [NN setup](#nn-setup)
  1. [Recurrent neural network (RNN)](#rnn)
  1. [Long short-term memory (LSTM) Network](#lstm)
  1. [Gated recurrent unit (GRU)](#gru)

In [None]:
import numpy as np

In [None]:
!pip install 'portalocker==2.8.2'

import portalocker

In [None]:
!pip install torch==2.2 torchtext==0.17

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

Check for GPU availability

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

Set random seed for reproducibility

In [None]:
torch.manual_seed(42)

## Dataset loading and processing <a name="preprocess"></a>

Load IMDB dataset

In [None]:
train_iter = IMDB(split='train')
test_iter = IMDB(split='test')
train_iter

In [None]:
i = 0
for a, b in train_iter:
  print(type(a))
  break


Define tokenizer

In [None]:
tokenizer = get_tokenizer('basic_english')

Helper function to yield tokens

In [None]:
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

Build vocabulary

In [None]:
vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>", "<pad>"])
vocab.set_default_index(vocab["<unk>"])

Text pipeline: tokenize and convert to indices

In [None]:
text_pipeline = lambda x: [vocab[token] for token in tokenizer(x)]

In [None]:
text_pipeline("This is a test.")

In [None]:
[ vocab[token] for token in tokenizer("this") ]
vocab["<unk>"]

Label pipeline: convert label to integer

In [None]:
label_pipeline = lambda x: 1 if x == "pos" else 0

Collate function for DataLoader

In [None]:
def collate_batch(batch):
    label_list, text_list, lengths = [], [], []
    for _label, _text in batch:
        label_list.append(label_pipeline(_label))
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)
        lengths.append(processed_text.size(0))

    labels = torch.tensor(label_list, dtype=torch.int64)
    padded_text = pad_sequence(text_list, batch_first=True, padding_value=vocab["<pad>"])
    lengths = torch.tensor(lengths)

    return padded_text, labels, lengths

Create DataLoader

In [None]:
batch_size = 32
train_iter = IMDB(split='train')
train_dataloader = DataLoader(list(train_iter), batch_size=batch_size,
                             shuffle=True, collate_fn=collate_batch)
test_iter = IMDB(split='test')
test_dataloader = DataLoader(list(test_iter), batch_size=batch_size,
                            collate_fn=collate_batch)

## NN setup <a name="nn-setup"></a>

Define model parameters

In [None]:
VOCAB_SIZE = len(vocab)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 2

In [None]:
VOCAB_SIZE

Training function

In [None]:
def train(model, dataloader, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_acc = 0

    for text, labels, _ in dataloader:
        text, labels = text.to(device), labels.to(device)

        optimizer.zero_grad()
        predictions = model(text)

        loss = criterion(predictions, labels)
        acc = ((predictions.argmax(1) == labels).float().sum())/len(labels)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss/len(dataloader), epoch_acc/len(dataloader)

Evaluation function

In [None]:
def evaluate(model, dataloader, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0

    with torch.no_grad():
        for text, labels, _ in dataloader:
            text, labels = text.to(device), labels.to(device)

            predictions = model(text)

            loss = criterion(predictions, labels)
            acc = ((predictions.argmax(1) == labels).float().sum()) / len(labels)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

Predict function for a single text input

In [None]:
def predict_sentiment(model, text):
    model.eval()
    with torch.no_grad():
        text_tensor = torch.tensor(text_pipeline(text)).unsqueeze(0).to(device)
        prediction = model(text_tensor)
        return prediction.argmax(1).item()

### Recurrent neural network (RNN) <a name="rnn"></a>


Our designed RNN model contains:

1. Embedding layer (vocab_size → embedding_dim)
1. RNN layer with basic hidden state
1. Dropout layer (dropout=0.5)
1. Fully connected layer (hidden_dim → output_dim)

In [None]:
class RNNModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text shape: [batch_size, seq_len]
        embedded = self.embedding(text)  # [batch_size, seq_len, embedding_dim]
        output, hidden = self.rnn(embedded)  # output: [batch_size, seq_len, hidden_dim]
                                            # hidden: [1, batch_size, hidden_dim]
        hidden = hidden.squeeze(0)  # [batch_size, hidden_dim]
        hidden = self.dropout(hidden)
        return self.fc(hidden)  # [batch_size, output_dim]


#### Initialize model, loss function, and optimizer

In [None]:
model_rnn = RNNModel(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_rnn.parameters(), lr=0.001)
model_rnn = model_rnn.to(device)
criterion = criterion.to(device)

#### Training loop

In [None]:
n_epochs = 5
for epoch in range(n_epochs):
    train_loss, train_acc = train(model_rnn, train_dataloader, optimizer, criterion)
    test_loss, test_acc = evaluate(model_rnn, test_dataloader, criterion)

    print(f'Epoch: {epoch+1}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\tTest Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

#### Example prediction

In [None]:
sample_text = "This movie is amazing! I really enjoyed it."
sentiment = predict_sentiment(model_rnn, sample_text)
print(f'Sample text: "{sample_text}"')
print(f'Sentiment: {"Positive" if sentiment == 1 else "Negative"}')

In [None]:
for text, labels, lengths in train_dataloader:
    print(text[0])
    print(labels[0])
    break

### Long short-term memory (LSTM) Network <a name="lstm"></a>


Our designed LSTM model contains:

1. Embedding layer (vocab_size → embedding_dim)
1. LSTM layer with forget, input, cell, and output gates
1. Optional bidirectional processing
1. Dropout layer (dropout=0.5)
1. Fully connected layer (hidden_dim → output_dim)

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=1, bidirectional=False, dropout=0.5):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim,
                           hidden_dim,
                           num_layers=n_layers,
                           bidirectional=bidirectional,
                           batch_first=True,
                           dropout=dropout if n_layers > 1 else 0)

        # If bidirectional, we need to multiply hidden_dim by 2
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text shape: [batch_size, seq_len]
        embedded = self.embedding(text)  # [batch_size, seq_len, embedding_dim]

        # LSTM returns: output, (hidden, cell)
        output, (hidden, cell) = self.lstm(embedded)

        # If bidirectional, concat the final forward and backward hidden states
        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        hidden = self.dropout(hidden)
        return self.fc(hidden)  # [batch_size, output_dim]

#### Initialize model, loss function, and optimizer

In [None]:
model_lstm = LSTMModel(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_lstm.parameters(), lr=0.001)
model_lstm = model_lstm.to(device)
criterion = criterion.to(device)

#### Training loop

In [None]:
n_epochs = 5
for epoch in range(n_epochs):
    train_loss, train_acc = train(model_lstm, train_dataloader, optimizer, criterion)
    test_loss, test_acc = evaluate(model_lstm, test_dataloader, criterion)

    print(f'Epoch: {epoch+1}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\tTest Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

#### Example prediction

In [None]:
sample_text = "This movie was fantastic! I really enjoyed it."
sentiment = predict_sentiment(model_lstm, sample_text)
print(f'Sample text: "{sample_text}"')
print(f'Sentiment: {"Positive" if sentiment == 1 else "Negative"}')

### Gated recurrent unit (GRU) <a name="gru"></a>


Our designed GRU model contains:

1. Embedding layer (vocab_size → embedding_dim)
1. GRU layer with reset gate, update gate, and hidden state
1. Optional bidirectional processing
1. Dropout layer (dropout=0.5)
1. Fully connected layer (hidden_dim → output_dim)

In [None]:
class GRUModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers=1, bidirectional=False, dropout=0.5):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.gru = nn.GRU(embedding_dim,
                          hidden_dim,
                          num_layers=n_layers,
                          bidirectional=bidirectional,
                          batch_first=True,
                          dropout=dropout if n_layers > 1 else 0)

        # If bidirectional, multiply hidden_dim by 2
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text):
        # text shape: [batch_size, seq_len]
        embedded = self.embedding(text)  # [batch_size, seq_len, embedding_dim]

        # GRU returns: output, hidden
        output, hidden = self.gru(embedded)

        # If bidirectional, concat the final forward and backward hidden states
        if self.gru.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]

        hidden = self.dropout(hidden)
        return self.fc(hidden)  # [batch_size, output_dim]


#### Initialize model, loss function, and optimizer

In [None]:
model_gru = GRUModel(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_gru.parameters(), lr=0.001)
model_gru = model_gru.to(device)
criterion = criterion.to(device)

#### Training loop

In [None]:
n_epochs = 5
for epoch in range(n_epochs):
    train_loss, train_acc = train(model_gru, train_dataloader, optimizer, criterion)
    test_loss, test_acc = evaluate(model_gru, test_dataloader, criterion)

    print(f'Epoch: {epoch+1}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\tTest Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

#### Example prediction

In [None]:
sample_text = "This movie was fantastic! I really enjoyed it."
sentiment = predict_sentiment(model_gru, sample_text)
print(f'Sample text: "{sample_text}"')
print(f'Sentiment: {"Positive" if sentiment == 1 else "Negative"}')