### NLP & Data Mining Endterm

### Divina Komal Dcunha | 2021NOVVUGP0016

In this notebook, a single-layer transformer encoder was implemented from scratch and trained on the IMDB dataset.

References were taken from the following link: https://github.com/HosseinZaredar/Transformer-from-Scratch?tab=readme-ov-file


The architecture includes:
1. Multi-Head Self-Attention: Captures relationships between words in the input sequence.
2. Feedforward Network: Enhances representation power after self-attention.
3. Embedding Layer: Input text is embedded into dense vector representations, crucial for capturing semantic meaning.
4. Positional Encoding: Adds positional information to the embeddings, helping the model understand word order.


In [None]:
#Future Direction: Transferring onto RasPi?

In [None]:
import torch as T
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import tensorflow_datasets as tfds

#Prepare and load the IMDB dataset via tfds.load()
imdb, info = tfds.load("imdb_reviews", with_info=True, as_supervised=True)
train_data, test_data = imdb['train'], imdb['test']

In [None]:
# Check dataset size via metadata 'info'
print("Total number of examples in the dataset:", info.splits.total_num_examples)
print("Number of training examples:", info.splits['train'].num_examples)
print("Number of testing examples:", info.splits['test'].num_examples)

Total number of examples in the dataset: 100000
Number of training examples: 25000
Number of testing examples: 25000


In [None]:
print(info.splits)

{'train': <SplitInfo num_examples=25000, num_shards=1>, 'test': <SplitInfo num_examples=25000, num_shards=1>, 'unsupervised': <SplitInfo num_examples=50000, num_shards=1>}


50,000 examples (unsupervised data, typically used for tasks like unsupervised pretraining or testing models without labels); reviews but without labels (i.e., no sentiment classification provided).

In [None]:
from collections import Counter

# Count positive and negative examples
train_labels = Counter(label.numpy() for _, label in train_data) #Ignoring review (review,label) -> (_, label)
test_labels = Counter(label.numpy() for _, label in test_data)

print("Training Data:")
print(f"  Positive examples: {train_labels[1]}")
print(f"  Negative examples: {train_labels[0]}")

print("\nTesting Data:")
print(f"  Positive examples: {test_labels[1]}")
print(f"  Negative examples: {test_labels[0]}")

Training Data:
  Positive examples: 12500
  Negative examples: 12500

Testing Data:
  Positive examples: 12500
  Negative examples: 12500


In [None]:
for example, label in train_data.take(5):  # Take 5 examples
    print("Review:", example.numpy().decode('utf-8'))  # Decode byte strings
    print("Label:", label.numpy())  # 0 or 1

Review: This was an absolutely terrible movie. Don't be lured in by Christopher Walken or Michael Ironside. Both are great actors, but this must simply be their worst role in history. Even their great acting could not redeem this movie's ridiculous storyline. This movie is an early nineties US propaganda piece. The most pathetic scenes were those when the Columbian rebels were making their cases for revolutions. Maria Conchita Alonso appeared phony, and her pseudo-love affair with Walken was nothing but a pathetic emotional plug in a movie that was devoid of any real meaning. I am disappointed that there are movies like this, ruining actor's like Christopher Walken's good name. I could barely sit through it.
Label: 0
Review: I have been known to fall asleep during films, but this is usually due to a combination of things including, really tired, being warm and comfortable on the sette and having just eaten a lot. However on this occasion I fell asleep because the film was rubbish. The 

In [None]:
# Step 1: Build Vocabulary
# Function to build vocabulary from the dataset using a tokenizer
# The vocabulary assigns a unique index to each token, reserving 0 for padding
def build_vocab(dataset, tokenizer):
    vocab = {}
    index = 1  # Starting index for vocabulary, reserve 0 for padding
    for text, _ in tfds.as_numpy(dataset):  # Convert dataset to NumPy format
        tokens = tokenizer(text.decode('utf-8'))  # Tokenize the text
        for token in tokens:  # Iterate through tokens
            if token not in vocab:  # If token not already in vocabulary
                vocab[token] = index  # Assign a unique index
                index += 1
    return vocab  # Return the constructed vocabulary

# Simple tokenizer function
# This function tokenizes text by splitting it on whitespace
def simple_tokenizer(text):
    return text.split()  # Tokenize by whitespace for simplicity

# Build vocab from training data
# Using the build_vocab function and simple_tokenizer to create a vocabulary
vocab = build_vocab(train_data, simple_tokenizer)

In [None]:
# Step 2: Tokenizer with Vocabulary Closure
# Function to create a tokenizer using the given vocabulary
# Unknown words are mapped to index 0
def get_tokenizer(vocab):
    def tokenizer(text):
        return [vocab.get(word, 0) for word in text.split()]  # Use 0 for unknown words
    return tokenizer

# Tokenizer instance created using the vocabulary
tokenizer = get_tokenizer(vocab)

# Custom Dataset Class
# A PyTorch Dataset class for handling the IMDB dataset
class IMDBDataset(Dataset):
    def __init__(self, tf_dataset, tokenizer, max_length=256):
        self.data = list(tfds.as_numpy(tf_dataset))  # Convert to NumPy format
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)  # Number of samples in the dataset

    def __getitem__(self, idx):
        text, label = self.data[idx]  # Extract text and label
        tokens = self.tokenizer(text.decode('utf-8'))  # Tokenize the text
        # Pad or truncate tokens to the maximum length
        padded_tokens = tokens[:self.max_length] + [0] * max(0, self.max_length - len(tokens))
        return T.tensor(padded_tokens), T.tensor(label, dtype=T.long)

# Transformer Components (Embedding, Attention, Encoder)
# Embedding layer: Combines word and positional embeddings
class Embedding(nn.Module):
    def __init__(self, vocab_size, max_length, embed_dim, dropout=0.1):
        super(Embedding, self).__init__()
        self.word_embed = nn.Embedding(vocab_size, embed_dim)  # Word embeddings
        self.pos_embed = nn.Embedding(max_length, embed_dim)  # Positional embeddings
        self.dropout = nn.Dropout(dropout)  # Dropout for regularization

    def forward(self, x):
        batch_size, seq_length = x.shape
        device = T.device('cuda' if T.cuda.is_available() else 'cpu')
        # Generate position indices for the sequence
        positions = T.arange(0, seq_length).expand(batch_size, seq_length).to(device)
        # Combine word and positional embeddings
        embedding = self.word_embed(x) + self.pos_embed(positions)
        return self.dropout(embedding)

# Multi-Head Self-Attention mechanism
class MHSelfAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super(MHSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        self.head_dim = embed_dim // num_heads  # Dimension of each attention head
        assert (self.num_heads * self.head_dim == self.embed_dim), \
            'Embed size must be divisible by the number of heads'
        # Linear layers for queries, keys, and values
        self.w_queries = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.w_keys = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.w_values = nn.Linear(self.embed_dim, self.embed_dim, bias=False)
        self.fc_out = nn.Linear(self.head_dim * self.num_heads, self.embed_dim)

    def forward(self, x):
        batch_size = x.shape[0]
        sentence_len = x.shape[1]
        # Compute queries, keys, and values for attention
        queries = self.w_queries(x).reshape(batch_size, sentence_len, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        keys = self.w_keys(x).reshape(batch_size, sentence_len, self.num_heads, self.head_dim).permute(0, 2, 3, 1)
        values = self.w_values(x).reshape(batch_size, sentence_len, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        # Compute scaled dot-product attention scores
        attention_scores = T.einsum('bijk,bikl->bijl', queries, keys)
        attention_dist = T.softmax(attention_scores / (self.embed_dim ** (1/2)), dim=-1)
        # Apply attention distribution to values
        attention_out = T.einsum('bijk,bikl->bijl', attention_dist, values)
        # Concatenate heads and pass through output linear layer
        concatenated_out = attention_out.permute(0, 2, 1, 3).reshape(batch_size, sentence_len, self.embed_dim)
        return concatenated_out

# Transformer Encoder block
class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim, num_heads, forward_expansion, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        self.attention = MHSelfAttention(embed_dim, num_heads)
        self.norm1 = nn.LayerNorm(embed_dim)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.feed_forward = nn.Sequential(
            nn.Linear(embed_dim, forward_expansion * embed_dim),
            nn.ReLU(),
            nn.Linear(forward_expansion * embed_dim, embed_dim)
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # Apply multi-head self-attention and residual connection
        attention_out = self.dropout(self.attention(x))
        x = self.norm1(x + attention_out)
        # Apply feed-forward network and residual connection
        forward_out = self.dropout(self.feed_forward(x))
        return self.norm2(x + forward_out)

# Transformer Classifier Model
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, max_length, embed_dim, num_heads, forward_expansion, num_layers, output_dim):
        super(TransformerClassifier, self).__init__()
        self.embedder = Embedding(vocab_size, max_length, embed_dim)  # Embedding layer
        self.encoder = nn.ModuleList(
            [TransformerEncoder(embed_dim, num_heads, forward_expansion) for _ in range(num_layers)]
        )
        self.fc = nn.Linear(embed_dim, output_dim)  # Fully connected output layer

    def forward(self, x):
        x = self.embedder(x)  # Pass through embedding layer
        for layer in self.encoder:  # Pass through encoder layers
            x = layer(x)
        x = x.mean(dim=1)  # Global average pooling
        return self.fc(x)  # Output layer

In [None]:
# Model and Training Parameters
# Defining key parameters for the Transformer model and training
VOCAB_SIZE = len(vocab) + 1  # Vocabulary size (+1 for the padding token)
EMBED_DIM = 128  # Dimensionality of embedding vectors
NUM_HEADS = 8  # Number of attention heads
FORWARD_EXPANSION = 4  # Expansion factor for the feed-forward network
NUM_LAYERS = 4  # Number of encoder layers
OUTPUT_DIM = 2  # Output dimensions (binary classification: positive/negative sentiment)
MAX_LENGTH = 256  # Maximum sequence length
BATCH_SIZE = 32  # Batch size for training
LEARNING_RATE = 0.001  # Learning rate for optimizer

# DataLoaders
# Convert datasets into PyTorch DataLoader for batching and shuffling
train_dataset = IMDBDataset(train_data, tokenizer, max_length=MAX_LENGTH)  # Training dataset
test_dataset = IMDBDataset(test_data, tokenizer, max_length=MAX_LENGTH)  # Test dataset
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)  # DataLoader for training
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)  # DataLoader for testing

# Training Loop Setup
# Set up device, model, loss function, and optimizer
device = T.device('cuda' if T.cuda.is_available() else 'cpu')  # Use GPU if available
# Initialize Transformer model
model = TransformerClassifier(VOCAB_SIZE, MAX_LENGTH, EMBED_DIM, NUM_HEADS, FORWARD_EXPANSION, NUM_LAYERS, OUTPUT_DIM)
model.to(device)  # Move model to the appropriate device
criterion = nn.CrossEntropyLoss()  # Loss function for multi-class classification
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  # Adam optimizer

In [None]:
# Training Function
# Performs one epoch of training: forward pass, loss computation, backpropagation, and weight updates
def train(model, iterator, optimizer, criterion):
    epoch_loss, epoch_acc = 0, 0  # Initialize epoch loss and accuracy
    model.train()  # Set model to training mode
    for batch in iterator:  # Iterate over training batches
        inputs, labels = batch  # Unpack input data and labels
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to the appropriate device
        optimizer.zero_grad()  # Reset gradients
        predictions = model(inputs)  # Forward pass: get predictions
        loss = criterion(predictions, labels)  # Compute loss
        acc = (predictions.argmax(1) == labels).float().mean()  # Compute accuracy
        loss.backward()  # Backward pass: compute gradients
        optimizer.step()  # Update model weights
        epoch_loss += loss.item()  # Accumulate batch loss
        epoch_acc += acc.item()  # Accumulate batch accuracy
    return epoch_loss / len(iterator), epoch_acc / len(iterator)  # Return average loss and accuracy

# Evaluation Function
# Evaluates the model on validation/test data without updating weights
def evaluate(model, iterator, criterion):
    epoch_loss, epoch_acc = 0, 0  # Initialize epoch loss and accuracy
    model.eval()  # Set model to evaluation mode (disables dropout, etc.)
    with T.no_grad():  # Disable gradient computation for evaluation
        for batch in iterator:  # Iterate over validation/test batches
            inputs, labels = batch  # Unpack input data and labels
            inputs, labels = inputs.to(device), labels.to(device)  # Move data to the appropriate device
            predictions = model(inputs)  # Forward pass: get predictions
            loss = criterion(predictions, labels)  # Compute loss
            acc = (predictions.argmax(1) == labels).float().mean()  # Compute accuracy
            epoch_loss += loss.item()  # Accumulate batch loss
            epoch_acc += acc.item()  # Accumulate batch accuracy
    return epoch_loss / len(iterator), epoch_acc / len(iterator)  # Return average loss and accuracy


In [None]:
# Training and evaluation loop
num_epochs = 5
for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, test_loader, criterion)

    print(f"Epoch {epoch + 1}:")
    print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}")
    print(f"Valid Loss: {valid_loss:.4f}, Valid Accuracy: {valid_acc:.4f}")

Epoch 1:
Train Loss: 0.5827, Train Accuracy: 0.6589
Valid Loss: 0.4710, Valid Accuracy: 0.7711
Epoch 2:
Train Loss: 0.3275, Train Accuracy: 0.8609
Valid Loss: 0.3604, Valid Accuracy: 0.8438
Epoch 3:
Train Loss: 0.1922, Train Accuracy: 0.9266
Valid Loss: 0.3837, Valid Accuracy: 0.8470
Epoch 4:
Train Loss: 0.1105, Train Accuracy: 0.9594
Valid Loss: 0.5561, Valid Accuracy: 0.8223
Epoch 5:
Train Loss: 0.0612, Train Accuracy: 0.9789
Valid Loss: 0.5894, Valid Accuracy: 0.8294


In [None]:
# Final evaluation on the test set
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}")

Test Loss: 0.5894, Test Accuracy: 0.8294


In [None]:
import torch

# Example sentiment mapping
label_map = {0: "negative", 1: "positive"}  # Mapping model output to human-readable labels

def classify_sentiment(model, tokenizer, input_text, device='cpu'):
    """
    Classifies the sentiment of input_text using the trained model.
    Args:
        model: Trained transformer model.
        tokenizer: Tokenizer function to preprocess input_text.
        input_text: The text to classify.
        device: 'cpu' or 'cuda' depending on your setup.
    Returns:
        Sentiment label (e.g., 'positive', 'negative').
    """
    model.eval()  # Set the model to evaluation mode (disables dropout, etc.)
    model.to(device)  # Move model to the specified device (CPU or GPU)

    with torch.no_grad():  # Disable gradient computations for efficiency
        # Tokenize and convert input to tensor
        tokens = tokenizer(input_text)  # Tokenize the input text
        input_ids = torch.tensor(tokens).unsqueeze(0).to(device)  # Add batch dimension and move to device

        # Pass input through the model to get logits (raw prediction scores)
        logits = model(input_ids)

        # Get the predicted label (index of the highest logit score)
        predicted_label = torch.argmax(logits, dim=1).item()
        return label_map[predicted_label]  # Map index to human-readable sentiment

# Ensure correct device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # Automatically select GPU if available

# Example usage-1
example_text = "The movie was absolutely fantastic! I loved it."  # Input text to classify
sentiment = classify_sentiment(model, tokenizer, example_text, device=device)  # Get sentiment
print(f"Sentiment: {sentiment}")  # Output the sentiment

Sentiment: negative


In [None]:
# Example usage-2
example_text = "I hated the olympics"
sentiment = classify_sentiment(model, tokenizer, example_text, device=device)
print(f"Sentiment: {sentiment}")

Sentiment: negative


In [None]:
# Example usage-2
example_text = "My laptop is amazing"
sentiment = classify_sentiment(model, tokenizer, example_text, device=device)
print(f"Sentiment: {sentiment}")

Sentiment: positive
