# 2025 CITS4012 Project

_Make sure you change the file name with your group id._


In [None]:
import pandas as pd
import numpy as np
import sys
import os 
import seaborn as sns
import matplotlib.pyplot as plt 
from sklearn.metrics import classification_report, confusion_matrix
import copy
import random

import nltk
if not nltk.download('punkt'):
    nltk.download('punkt')
    nltk.download('punkt_tab')
from nltk.tokenize import word_tokenize

if not nltk.download('stopwords'):
    nltk.download('stopwords')

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
from sklearn.utils.class_weight import compute_class_weight

import re
import pandas as pd
import numpy as np

# Stemmer and Lemmatizer
from nltk.stem import PorterStemmer, WordNetLemmatizer
from nltk.corpus import wordnet
from nltk import pos_tag, word_tokenize

if not nltk.download('wordnet'):
    nltk.download('wordnet')
    nltk.download('omw-1.4')
    nltk.download('averaged_perceptron_tagger')

sys.path.append(os.path.abspath('..')) 
import math


import torch
import torch, torch.nn as nn, torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("Number of GPUs:", torch.cuda.device_count())
if torch.cuda.is_available():
    print("GPU name:", torch.cuda.get_device_name(0))

sys.path.append(os.path.abspath('../')) # Points to the current folder 

In [None]:
import time
import math

# Helper functions for training
def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


### **We build a Configuration class to have all the hyperparameters together**


In [None]:
# Class construct for hyperparameters 
class Config:
    def __init__(self):
        self.min_word_len = 3
        self.max_word_len = 64
        self.batch_size = 64
        self.embedding_dim = 200
        self.hidden_dim = 64
        self.max_combined_len = 130 #to join premise + hypothesis + cls + sep 
        self.patience = 5
        #Embedding
        self.embedding_method = ""
        self.emb_lr = 0.0005
        self.emb_epoch = 3
        self.emb_window_size = 5
        # Models
        self.n_heads = 2
        self.n_layers = 1
        self.dropout = 0.3
        self.num_classes = 2
        self.lr = 0.00005
        self.wd = 0.0005
        self.train_num_epoch = 7
        self.label_smoothing = 0.0

    def display(self): # Display all the config parameters 
        print("=" * 60)
        print("CONFIGURATION PARAMETERS")
        print("=" * 60)
        print("\nEmbedding Parameters:")
        print(f"  Method: {self.embedding_method}")
        print(f"  Max length: {self.max_word_len}")
        print(f"  Learning rate: {self.emb_lr}")
        print(f"  Embedding size: {self.embedding_dim}")
        print(f"  Epochs: {self.emb_epoch}")
        print(f"  Window size: {self.emb_window_size}")
        
        print("\nTransformer Parameters:")
        print(f"  Batch size: {self.batch_size}")
        print(f"  Number of heads: {self.n_heads}")
        print(f"  Number of layers: {self.n_layers}")
        print(f"  Dropout: {self.dropout}")
        print(f"  Number of classes: {self.num_classes}")
        print(f"  Learning rate: {self.lr}")
        print(f"  Weight decay: {self.wd}")
        print(f"  Training epochs: {self.train_num_epoch}")

        print(f"  Label smoothing: {self.label_smoothing}")
        print("=" * 60)


In [None]:
hyperparameters = Config()

# Readme


# 1.Dataset Processing


In [None]:

# These are common English contractions.
contraction_dict = {"ain't": "is not", "aren't": "are not","can't": "cannot", "'cause": "because", "could've": "could have",
                    "couldn't": "could not", "didn't": "did not",  "doesn't": "does not", "don't": "do not", "hadn't": "had not",
                    "hasn't": "has not", "haven't": "have not", "he'd": "he would","he'll": "he will", "he's": "he is", "how'd": "how did",
                    "how'd'y": "how do you", "how'll": "how will", "how's": "how is",  "I'd": "I would", "I'd've": "I would have",
                    "I'll": "I will", "I'll've": "I will have","I'm": "I am", "I've": "I have", "i'd": "i would", "i'd've": "i would have",
                    "i'll": "i will",  "i'll've": "i will have","i'm": "i am", "i've": "i have", "isn't": "is not", "it'd": "it would",
                    "it'd've": "it would have", "it'll": "it will", "it'll've": "it will have","it's": "it is", "let's": "let us",
                    "ma'am": "madam", "mayn't": "may not", "might've": "might have","mightn't": "might not","mightn't've": "might not have",
                    "must've": "must have", "mustn't": "must not", "mustn't've": "must not have", "needn't": "need not", "needn't've": "need not have",
                    "o'clock": "of the clock", "oughtn't": "ought not", "oughtn't've": "ought not have", "shan't": "shall not", "sha'n't": "shall not",
                    "shan't've": "shall not have", "she'd": "she would", "she'd've": "she would have", "she'll": "she will", "she'll've": "she will have",
                    "she's": "she is", "should've": "should have", "shouldn't": "should not", "shouldn't've": "should not have", "so've": "so have",
                    "so's": "so as", "this's": "this is","that'd": "that would", "that'd've": "that would have", "that's": "that is", "there'd": "there would",
                    "there'd've": "there would have", "there's": "there is", "here's": "here is","they'd": "they would", "they'd've": "they would have",
                    "they'll": "they will", "they'll've": "they will have", "they're": "they are", "they've": "they have", "to've": "to have", "wasn't": "was not",
                    "we'd": "we would", "we'd've": "we would have", "we'll": "we will", "we'll've": "we will have", "we're": "we are", "we've": "we have",
                    "weren't": "were not", "what'll": "what will", "what'll've": "what will have", "what're": "what are",  "what's": "what is", "what've": "what have",
                    "when's": "when is", "when've": "when have", "where'd": "where did", "where's": "where is", "where've": "where have", "who'll": "who will",
                    "who'll've": "who will have", "who's": "who is", "who've": "who have", "why's": "why is", "why've": "why have", "will've": "will have",
                    "won't": "will not", "won't've": "will not have", "would've": "would have", "wouldn't": "would not", "wouldn't've": "would not have",
                    "y'all": "you all", "y'all'd": "you all would","y'all'd've": "you all would have","y'all're": "you all are","y'all've": "you all have",
                    "you'd": "you would", "you'd've": "you would have", "you'll": "you will", "you'll've": "you will have", "you're": "you are", "you've": "you have"}


# Helper function for lemmatization with POS tagging
def get_wordnet_pos(treebank_tag):
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN  # Default to noun if no match
    
# Allowed POS tags for filtering (example: nouns, verbs, adjectives, adverbs)
allowed_pos_tags = {'NN', 'NNS', 'NNP', 'NNPS',   # Nouns
                    'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ',  # Verbs
                    'JJ', 'JJR', 'JJS',           # Adjectives
                    'RB', 'RBR', 'RBS'}           # Adverbs

# Filter the desired POS tags
def filter_tokens_by_pos(tokens):
    tagged_tokens = pos_tag(tokens)
    filtered = [word for word, tag in tagged_tokens if tag in allowed_pos_tags]
    return filtered

def clean_dataset(dataset, min_word_len, max_word_len, method=['stem, lemmatize'], pos_filter=False, stop_w=False, clean_nums = False):
    cleaned_premises = []
    cleaned_hypotheses = []
    cleaned_labels = []

    for index, row in dataset.iterrows():
        premise = row['premise']
        hypothesis = row['hypothesis']
        label = row['label']

        # Lowercase
        premise = premise.lower()
        hypothesis = hypothesis.lower()

        # Expand contractions
        for contraction, full_form in contraction_dict.items():
            premise = premise.replace(contraction, full_form)
            hypothesis = hypothesis.replace(contraction, full_form)

        # Remove punctuation/special chars
        premise = re.sub(r'[^a-zA-Z0-9\s.-]', ' ', premise)
        hypothesis = re.sub(r'[^a-zA-Z0-9\s.-]', ' ', hypothesis)

        # Replace underscores/hyphens with spaces, then normalize whitespace
        premise = re.sub(r'[-–—_]+', ' ', premise)
        hypothesis = re.sub(r'[-–—_]+', ' ', hypothesis)

        # Normalize whitespace
        premise = re.sub(r'\s+', ' ', premise).strip()
        hypothesis = re.sub(r'\s+', ' ', hypothesis).strip()

        # Tokenization
        premise_tokens = word_tokenize(premise)
        hypothesis_tokens = word_tokenize(hypothesis)

        # # Replace numbers with '<NUM>'
        if clean_nums is True:
            premise_tokens = ['[NUM]' if any(char.isdigit() for char in word) else word for word in premise_tokens]
            hypothesis_tokens = ['[NUM]' if any(char.isdigit() for char in word) else word for word in hypothesis_tokens]


        # Stemming/Lemmatization
        if 'stem' in method:
            stemmer = PorterStemmer()
            premise_tokens = [stemmer.stem(word) for word in premise_tokens]
            hypothesis_tokens = [stemmer.stem(word) for word in hypothesis_tokens]
        elif 'lemmatize' in method:
            lemmatizer = WordNetLemmatizer()
            premise_pos_tags = pos_tag(premise_tokens)
            hypothesis_pos_tags = pos_tag(hypothesis_tokens)
            premise_tokens = [lemmatizer.lemmatize(word, get_wordnet_pos(pos)) for word, pos in premise_pos_tags]
            hypothesis_tokens = [lemmatizer.lemmatize(word, get_wordnet_pos(pos)) for word, pos in hypothesis_pos_tags]

        # POS Filtering
        if pos_filter:
            premise_tokens = filter_tokens_by_pos(premise_tokens)
            hypothesis_tokens = filter_tokens_by_pos(hypothesis_tokens)

        # Remove stop words
        if stop_w:
            premise_tokens = [word for word in premise_tokens if word not in stop_words]
            hypothesis_tokens = [word for word in hypothesis_tokens if word not in stop_words]

        # Now check token length AFTER cleaning
        if (min_word_len <= len(premise_tokens) <= max_word_len and
            min_word_len <= len(hypothesis_tokens) <= max_word_len):
            cleaned_premises.append(premise_tokens)
            cleaned_hypotheses.append(hypothesis_tokens)
            cleaned_labels.append(label)
        # else: skip row

    # Build DataFrame from all cleaned token lists
    new_dataset = pd.DataFrame({
        'premise': cleaned_premises,
        'hypothesis': cleaned_hypotheses,
        'label': cleaned_labels
    })

    return new_dataset

## 1.1 Make the Datasets


In [None]:
# Load raw data
train_df = pd.read_json('../Dataset/train.json')
test_df = pd.read_json('../Dataset/test.json')
validation_df = pd.read_json('../Dataset/validation.json')

# Clean datasets. MAX_WORD_LENGTH set to 64 to remove very long texts
clean_train_dataset = clean_dataset(train_df, hyperparameters.min_word_len, hyperparameters.max_word_len, stop_w=True, method=[], pos_filter=False)
clean_test_dataset = clean_dataset(test_df, hyperparameters.min_word_len, hyperparameters.max_word_len,stop_w=True, method=[], pos_filter= False )
clean_validation_dataset = clean_dataset(validation_df, hyperparameters.min_word_len, hyperparameters.max_word_len,stop_w=True, method=[], pos_filter= False )

# Combine clean premises and hypotheses
clean_t_dataset = clean_train_dataset['premise'] + clean_train_dataset['hypothesis']
clean_t_dataset.head()

In [None]:
sum(train_df['label']=='entails') / (sum(train_df['label']!='entails') + sum(train_df['label']=='entails'))

In [None]:
sum(validation_df['label']=='entails') / (sum(validation_df['label']!='entails') + sum(validation_df['label']=='entails'))

In [None]:
sum(test_df['label']=='entails') / (sum(test_df['label']!='entails') + sum(test_df['label']=='entails'))

In [None]:
# Create a unique word list from the cleaned dataset
unique_words = set()
for sentence in clean_t_dataset: # Both Premise and Hypothesis 
    for word in sentence:
        unique_words.add(word)
        
unique_words_list = sorted(list(unique_words))

# Make dictionary of words and indices
word2id = {w:i for i,w in enumerate(unique_words_list)}
id2word = {i:w for i,w in enumerate(unique_words_list)}


# Add special tokens to use later
SPECIAL_TOKENS = ['[PAD]','[UNK]','[CLS]','[SEP]'] 
for tok in SPECIAL_TOKENS:
    if tok not in word2id:
        idx = len(word2id)
        word2id[tok] = idx
        id2word[idx] = tok


In [None]:
SEP_ID = word2id['[SEP]']
UNK_ID = word2id['[UNK]']
PAD_ID = word2id['[PAD]']
CLS_ID = word2id['[CLS]']


def prepare_indexed_data(df, word2id, max_len):
    input_ids = []
    attention_mask = []
    labels = []

    label_map = {'neutral': 0, 'entails': 1}

    for _, row in df.iterrows():
        premise_toks = row['premise']
        hypothesis_toks = row['hypothesis']
        label = row['label']

        # Add special tokens 
        tokens = [CLS_ID] \
                + [word2id.get(w,UNK_ID) for w in premise_toks] \
                + [SEP_ID] \
                + [word2id.get(w,UNK_ID) for w in hypothesis_toks] 
        # Truncate
        tokens = tokens[:max_len] #to combine len of hypothesis and premise, plus cls and sep  

        # Attention mask 
        attn = [1] * len(tokens)

        # Pad
        pad_len = max_len - len(tokens) # To fill the [PAD]
        if pad_len > 0:
            tokens += [PAD_ID] * pad_len
            attn += [0] * pad_len # FLag positions as padding 

        input_ids.append(tokens)
        attention_mask.append(attn)
        labels.append(label_map[label])

    return (torch.LongTensor(input_ids),
            np.array(input_ids),
            torch.LongTensor(attention_mask),
            torch.LongTensor(labels))

In [None]:
# Build train/test
x_train, plain_train, train_masks, y_train = prepare_indexed_data(clean_train_dataset, word2id, hyperparameters.max_combined_len)
x_test,  plain_test, test_masks,  y_test  = prepare_indexed_data(clean_test_dataset,  word2id, hyperparameters.max_combined_len)
x_valid,  plain_valid, valid_masks,  y_valid  = prepare_indexed_data(clean_validation_dataset,  word2id, hyperparameters.max_combined_len)

print("Shape of x_train:", x_train.shape)
print("Shape of train_masks:", train_masks.shape)
print("Shape of y_train:", y_train.shape)

# Sanity Check 
i = 0
print("\n--- Example ---")
print("First example real length (mask sum):", int(train_masks[i].sum()))
print("First example PAD count:", int((train_masks[i]==0).sum()))

# Check we didn't cut off too much
sns.histplot(np.sum(train_masks.numpy(), axis=1), bins = 16)
plt.axvline(hyperparameters.max_word_len, color='red')
plt.show()

# Prepare the datasets for model training
train_ds = TensorDataset(x_train, train_masks, y_train)
test_ds = TensorDataset(x_test, test_masks,y_test)
valid_ds = TensorDataset(x_valid, valid_masks,y_valid)

# Dataloaders (shuffle for train)
train_loader = DataLoader(train_ds, batch_size=hyperparameters.batch_size, shuffle=True, num_workers=0)
test_loader = DataLoader(test_ds, batch_size=hyperparameters.batch_size, shuffle=False, num_workers=0)
valid_loader = DataLoader(valid_ds, batch_size=hyperparameters.batch_size, shuffle=False, num_workers=0)

In [None]:
# Get the first batch from the train_loader Get the first batch from the train_loader

## 1.1 Make an embedding model


In [None]:
from gensim.models import FastText

fast_text_model = FastText(clean_t_dataset, # Both premise and Hypothesis 
                           vector_size=hyperparameters.embedding_dim,
                           window=hyperparameters.emb_window_size,
                           sg=1,
                           epochs=hyperparameters.emb_epoch
                           )

fast_text_model.wv.most_similar('saturn', topn=10)

def build_embedding_matrix(word2id, pretrained_vectors, embedding_size):
    vocab_size = len(word2id)
    embedding_matrix = np.zeros((vocab_size, embedding_size), dtype=np.float32)

    # Fill the matrix with the pre-trained vectors
    for word, idx in word2id.items():
        if word == '[PAD]':# If [PAD] ignore 
            continue
        try:
            vec = pretrained_vectors[word] #Checks the size of the vector of each word
            if vec.shape[0] == embedding_size:
                embedding_matrix[idx] = vec.astype(np.float32)
            else:
                # fallback if dims don’t match
                embedding_matrix[idx] = np.random.normal(0.0, 0.02, size=(embedding_size,)).astype(np.float32)
        except KeyError:
            # special tokens or OOV start them with random values 
            embedding_matrix[idx] = np.random.normal(0.0, 0.02, size=(embedding_size,)).astype(np.float32)

    print("Embedding matrix created with shape:", embedding_matrix.shape)

    return embedding_matrix


embedding_matrix = build_embedding_matrix(word2id, fast_text_model.wv, hyperparameters.embedding_dim)

In [None]:
# Look at embedding matrix with random words
candidates = [i for i in range(embedding_matrix.shape[0]) 
              if id2word.get(i) not in (None, "[PAD]")]

rng = np.random
chosen = rng.choice(candidates, size=min(5, len(candidates)), replace=False)

print(f"{'idx':>4}  {'token':<20}  first 5 dims")
print("-" * 80)
for i in chosen:
    token = id2word[i]
    print(f"{i:>4}  {token:<20}  {embedding_matrix[i][:5]}")

# 2. Model Implementation


Note we have collected following parameters from the pre-processing:

- VOCAB_SIZE
- MAX_SEQ_LEN
- BATCH_SIZE
- embedding_dim
  We have collected the objects:
- embedding_matrix
- fast_text_model
- clean_train_dataset, clean_test_dataset, clean_validation_dataset


**We build a Configuration class to have all the hyperparameters together**


## Positional Encoding


## Model A: RNN with Attention


### LSTM With Attention


In [None]:
class LSTMAttentionModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim=2, dropout=0.3, num_layers = 2):
        super(LSTMAttentionModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True, num_layers=num_layers, dropout=dropout)
        self.attention = nn.Linear(hidden_dim * 2, 1)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        
    def forward(self, x, mask=None, return_weights = False):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        attn_weights = torch.softmax(self.attention(lstm_out), dim=1)
        context = torch.sum(attn_weights * lstm_out, dim=1)
        context = self.dropout(context)

        logits = self.fc(context)
        if return_weights:
            return logits, attn_weights
        else:
            return logits

### Gru with Attention


In [None]:
class GRUAttentionModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, dropout=0.3, unk_prob=0.1, num_layers = 2):
        super(GRUAttentionModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True, bidirectional=True, num_layers=num_layers, dropout=dropout)
        self.norm = nn.LayerNorm(hidden_dim * 2)
        self.attention = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1))
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2, 2)
        self.unk_id = UNK_ID
        self.unk_prob = unk_prob

    def forward(self, x, mask=None, return_weights = False):
        # Only replace tokens during training
        if self.training and self.unk_prob > 0:
            mask = (x != PAD_ID) & (x != SEP_ID) & (x != CLS_ID) & (x != UNK_ID)
            replace_mask = (torch.rand_like(x.float()) < self.unk_prob) & mask
            x = torch.where(replace_mask, torch.full_like(x, self.unk_id), x)
        # Rest of the forward pass...
        embedded = self.embedding(x)
        gru_out, _ = self.gru(embedded)
        gru_out = self.norm(gru_out)
        attn_weights = torch.softmax(self.attention(gru_out), dim=1)
        context = torch.sum(attn_weights * gru_out, dim=1)
        context = self.dropout(context)

        logits = self.fc(context)
        # Possibiity to return the attention weights 
        if return_weights:
            return logits, attn_weights
        else:
            return logits

## Model B: Transformer with Attention


### Positional Encoding for the transformer


In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self,d_model,max_len = 512, dropout = 0.1):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0,max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0,d_model,2).float() * (-math.log(10000.0) / d_model))
        pe[:,0::2] = torch.sin(position * div_term)
        pe[:,1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))
        self.dropout = nn.Dropout(dropout)

    def forward(self, x: torch.Tensor):
        x = x + self.pe[:, :x.size(1), :] 
        return self.dropout(x)

In [None]:
class TransformerClassifier(nn.Module):
    def __init__(self, embedding_matrix, num_classes, n_heads, n_layers, dropout):
        super().__init__()
        vocab_size, embed_dim = embedding_matrix.shape
        self.d_model = embed_dim

        # Embedding Layer
        self.embedding = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix), freeze=False, padding_idx=word2id['[PAD]'])

        # Positional Encoding
        self.pos_encoder = PositionalEncoding(embed_dim, max_len=hyperparameters.max_combined_len, dropout=hyperparameters.dropout)

        # Transformer Encoder Layers
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=n_heads,
            dropout=hyperparameters.dropout,
            dim_feedforward=(embed_dim*4),
            batch_first=True 
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer,num_layers=n_layers)

        # Final Classification Head
        #self.classifier = nn.Linear(embed_dim,num_classes)

        self.classifier = nn.Sequential(
            nn.LayerNorm(embed_dim),
            nn.Dropout(dropout),
            nn.Linear(embed_dim, embed_dim // 2),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(embed_dim // 2, num_classes)
        )

    def forward(self, src, src_mask):
        padding_mask = (src_mask == 0)

        # Apply embedding and positional encoding
        embedded = self.embedding(src) * math.sqrt(self.d_model)
        pos_encoded = self.pos_encoder(embedded)

        # Pass trough the transformer encoder
        encoded = self.transformer_encoder(pos_encoded,src_key_padding_mask = padding_mask)

        # Use the output of the [CLS] token for classification
        cls_output = encoded[:,0,:]

        # Get final logits from the classifier
        logits = self.classifier(cls_output)
        return logits
    
    # Get attention from the first layer
    def get_attention_weights(self, src, src_mask):
        # Get embeddings and positional encoding
        padding_mask = (src_mask == 0)
        embedded = self.embedding(src) * math.sqrt(self.d_model)
        pos_encoded = self.pos_encoder(embedded)

        # Access the FIRST encoder layer 
        first_encoder_layer = self.transformer_encoder.layers[0]

        # Call its self-attention module with need_weights=True
        _, attn_weights = first_encoder_layer.self_attn(
            pos_encoded, pos_encoded, pos_encoded,
            key_padding_mask=padding_mask,
            need_weights=True,
            average_attn_weights=False
        )

        return attn_weights.cpu().detach()

## Model C: Wide and Deep RNN


This model seeks to build on Model A by splitting into three components


In [None]:
class GRUThreeStreamNLI(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, wide_layers=2, prem_layers=2, hypo_layers=2, num_fc_layers=2, embedding_dropout=0.2, dropout=0.3, unk_prob=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.embed_dropout = nn.Dropout(embedding_dropout)
        self.dropout = nn.Dropout(dropout)

        # Encoders
        self.gru = nn.GRU(embedding_dim, hidden_dim, batch_first=True,
                          bidirectional=True, num_layers=wide_layers, dropout=dropout if wide_layers > 1 else 0.0)
        self.prem_encoder = nn.GRU(
            embedding_dim, hidden_dim, num_layers=prem_layers,
            batch_first=True, bidirectional=True, dropout=dropout if prem_layers > 1 else 0.0
        )
        self.hypo_encoder = nn.GRU(
            embedding_dim, hidden_dim, num_layers=hypo_layers,
            batch_first=True, bidirectional=True, dropout=dropout if hypo_layers > 1 else 0.0
        )

        self.norm = nn.LayerNorm(hidden_dim * 2)


        # Self-attention for premise and hypothesis
        self.self_attn = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, 1)
        )
        # Cross-attention for the wide stream
        self.cross_attn_query = nn.Linear(hidden_dim * 2, hidden_dim)
        self.cross_attn_key = nn.Linear(hidden_dim * 2, hidden_dim)
        self.cross_attn_value = nn.Linear(hidden_dim * 2, hidden_dim)

        # Fusion and classification layers
        fusion_dim = hidden_dim * (4 + 2 + 2)  # 4H from cross + 2H + 2H = 8H total
        fc_layers = []
        in_dim = fusion_dim
        out_dim = hidden_dim * 2

        for i in range(num_fc_layers):
            fc_layers.extend([
                nn.Linear(in_dim, out_dim),
                nn.ReLU(),
                nn.Dropout(dropout)
            ])
            # for next layer
            in_dim = out_dim
            # Optionally shrink layer width gradually:
            out_dim = max(hidden_dim, out_dim // 2)

        self.fc_layers = nn.Sequential(*fc_layers)
        self.classifier = nn.Linear(in_dim, 2)
        self.unk_id = UNK_ID
        self.unk_prob = unk_prob


    def forward(self, x, mask=None):
        # Token corruption (word dropout)
        if self.training and self.unk_prob > 0:
            mask = (x != PAD_ID) & (x != SEP_ID) & (x != CLS_ID) & (x != UNK_ID)
            replace_mask = (torch.rand_like(x.float()) < self.unk_prob) & mask
            x = torch.where(replace_mask, torch.full_like(x, self.unk_id), x)

        # Split into premise/hypothesis 
        sep_positions = (x == SEP_ID).int().argmax(dim=1)
        batch_idx = torch.arange(x.size(0), device=x.device)
        token_range = torch.arange(x.size(1), device=x.device).unsqueeze(0)
        premise_mask = token_range < sep_positions.unsqueeze(1)
        hypothesis_mask = token_range > sep_positions.unsqueeze(1)
        premise = torch.where(premise_mask, x, PAD_ID)
        hypothesis = torch.where(hypothesis_mask, x, PAD_ID)

        #  Embeddings
        embedded = self.embed_dropout(self.embedding(x).transpose(1,2)).transpose(1,2)
        emb_prem = self.embed_dropout(self.embedding(premise).transpose(1,2)).transpose(1,2)
        emb_hypo = self.embed_dropout(self.embedding(hypothesis).transpose(1,2)).transpose(1,2)

        # Encode all three streams
        full_out, _ = self.gru(embedded)
        prem_out, _ = self.gru(emb_prem)
        hypo_out, _ = self.gru(emb_hypo)

        full_out = self.norm(full_out)
        prem_out, _ = self.prem_encoder(emb_prem)
        hypo_out, _ = self.hypo_encoder(emb_hypo)

        # # Wide stream (cross-attention over full sequence)
        # Q = self.cross_attn_query(full_out)
        # K = self.cross_attn_key(full_out)
        # V = self.cross_attn_value(full_out)
        # attn_scores = torch.bmm(Q, K.transpose(1, 2)) / (Q.size(-1) ** 0.5)
        # attn_weights = F.softmax(attn_scores, dim=-1)
        # cross_context = torch.bmm(attn_weights, V).mean(dim=1)  # (B, hidden_dim)

        # --- Bidirectional cross-attention between premise and hypothesis ---
        # Project to query/key/value spaces
        Q_p = self.cross_attn_query(prem_out)   # (B, Lp, H)
        K_h = self.cross_attn_key(hypo_out)
        V_h = self.cross_attn_value(hypo_out)

        Q_h = self.cross_attn_query(hypo_out)   # (B, Lh, H)
        K_p = self.cross_attn_key(prem_out)
        V_p = self.cross_attn_value(prem_out)

        # Premise → Hypothesis attention (H attends to P)
        scores_h2p = torch.bmm(Q_h, K_p.transpose(1, 2)) / (Q_h.size(-1) ** 0.5)
        weights_h2p = F.softmax(scores_h2p, dim=-1)
        context_h2p = torch.bmm(weights_h2p, V_p)  # (B, Lh, H)

        # Hypothesis → Premise attention (P attends to H)
        scores_p2h = torch.bmm(Q_p, K_h.transpose(1, 2)) / (Q_p.size(-1) ** 0.5)
        weights_p2h = F.softmax(scores_p2h, dim=-1)
        context_p2h = torch.bmm(weights_p2h, V_h)  # (B, Lp, H)

        # Pool both directions
        context_h2p = context_h2p.mean(dim=1)  # (B, H)
        context_p2h = context_p2h.mean(dim=1)  # (B, H)

        # Fuse bidirectional cross contexts
        cross_context = torch.cat([
            context_p2h,                   # P→H
            context_h2p,                   # H→P
            torch.abs(context_p2h - context_h2p),  # difference
            context_p2h * context_h2p              # element-wise product
        ], dim=1)  # (B, 4H)

        # Deep streams (self-attention pooling)
        def self_attention_pooling(out):
            weights = F.softmax(self.self_attn(out), dim=1)  # (B, L, 1)
            return torch.sum(weights * out, dim=1)           # (B, 2H)

        prem_context = self_attention_pooling(prem_out)
        hypo_context = self_attention_pooling(hypo_out)

        # Fusion
        fused = torch.cat([cross_context, prem_context, hypo_context], dim=1)
        fused = self.fc_layers(fused)
        logits = self.classifier(fused)

        return logits

In [None]:
contain_UNK = np.zeros(len(plain_valid), dtype=bool)
for i, sent in enumerate(plain_valid):
    if UNK_ID in sent:
        contain_UNK[i] = True

## Build the models


In [None]:
vocab_size = len(word2id)

In [None]:
def instantiate_models(gru=False, lstm=False, transformer=False, monstruosity=False):
    
    models = {} 

    # Build Gru Attention
    if gru:
        models['gru'] = GRUAttentionModel(
            vocab_size=vocab_size,
            embedding_dim=hyperparameters.embedding_dim,
            hidden_dim=hyperparameters.hidden_dim,
            dropout=hyperparameters.dropout,
            unk_prob=0.7,
            num_layers=hyperparameters.n_layers
        ).to(device)
    
    # Build LSTM with Attention
    if lstm:
        models['lstm'] = LSTMAttentionModel(
            vocab_size=vocab_size,
            embedding_dim=hyperparameters.embedding_dim,
            hidden_dim=hyperparameters.hidden_dim,
            dropout=hyperparameters.dropout,
            num_layers=hyperparameters.n_layers
        ).to(device)

    # Build Transformer 
    if transformer:
        models['transformer'] = TransformerClassifier(
            embedding_matrix=embedding_matrix,
            num_classes=2,
            n_heads=hyperparameters.n_heads,
            n_layers=hyperparameters.n_layers,
            dropout=hyperparameters.dropout
        ).to(device) 

    if monstruosity:
        models['monstruosity'] = GRUThreeStreamNLI(
            vocab_size=vocab_size,
            embedding_dim=hyperparameters.embedding_dim,
            hidden_dim=hyperparameters.hidden_dim, 
            wide_layers=2, 
            prem_layers=4, 
            hypo_layers=2, 
            num_fc_layers=2,
            embedding_dropout=0.2,
            dropout=0.5, 
            unk_prob=0.1
        ).to(device)

    return models 

In [None]:
models = instantiate_models(gru=True, lstm=True, transformer=True, monstruosity=True)
#transformer_model = instantiate_models(transformer=True)
#lstm_model = instantiate_models(lstm=True)
#monstruosity_model = instantiate_models(monstruosity=True)

## Train Models


### Training Global Function


In [None]:
def train_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    total_correct = 0
    total = 0

    for xb, mb, yb in loader: # x_train, train_mask, y_train
        xb,mb,yb = xb.to(device), mb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb, mb)
        loss = criterion(logits, yb)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)# Gradient clipping, prevents gradients to become to large 
        optimizer.step()

        total_loss += loss.item() * yb.size(0)
        preds = logits.argmax(1)
        total_correct += (preds == yb).sum().item()
        total += yb.size(0)
    return total_loss/total, total_correct/total

@torch.no_grad()
def evaluate_model(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    total_correct = 0
    total = 0
    
    for xb, mb, yb in loader:
        xb, mb, yb = xb.to(device), mb.to(device), yb.to(device).long()
        out = model(xb, mb)
        logits = out[0] if isinstance(out, (tuple, list)) else out

        loss = criterion(logits, yb)
        batch_size = yb.size(0)
        total_loss += loss.item() * batch_size
        preds = logits.argmax(dim=1)
        total_correct += (preds == yb).sum().item()
        total += batch_size

    return total_loss / total, total_correct / total


def train_model(model, train_loader, valid_loader, optimizer, n_epochs=10, patience=3,criterion=nn.CrossEntropyLoss()):
    best_val_acc = 0.0
    epochs_no_improve = 0
    best_model_state = None

    history = {"train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}

    for epoch in range(n_epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer,criterion)
        val_loss, val_acc = evaluate_model(model, valid_loader, criterion)

        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc)
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc)

        print(f"Epoch {epoch+1}/{n_epochs} - "
              f"Train loss: {train_loss:.4f}, Train acc: {train_acc:.4f} - "
              f"Val loss: {val_loss:.4f}, Val acc: {val_acc:.4f}")

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            epochs_no_improve = 0
            best_model_state = copy.deepcopy(model.state_dict())
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print("Early stopping triggered.")
            break

    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    return model, history

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def train_models(models_to_train):
    trained_models = {}
    histories = {}

    # Calculate class weights
    y_train_np = y_train.cpu().numpy()
    weights = compute_class_weight('balanced', classes=np.unique(y_train_np), y=y_train_np)
    criterion = nn.CrossEntropyLoss(weight=torch.tensor(weights, dtype=torch.float).to(device))

    # Iterate through the models that were passed in
    for name, model in models_to_train.items():
        print(f"--- Training {name.upper()} Model ---")
        model.to(device)
        optimizer = torch.optim.AdamW(model.parameters(), lr=hyperparameters.lr, weight_decay=hyperparameters.wd)
        
        trained_model, history = train_model(
            model, 
            train_loader, 
            valid_loader, 
            optimizer, 
            n_epochs=hyperparameters.train_num_epoch, 
            patience=5, 
            criterion=criterion
        )
        
        trained_models[name] = trained_model
        histories[name] = history
        print("\n")

    return trained_models, histories

In [None]:
# You can train one like : train_models(gru=True)

trained_models, histories = train_models(models)

## Validation


In [None]:
# Print model acuracy in samples with and without UNK tokens 
def analyze_unk_performance(model, data_loader, plain_data):
    model.eval()
    predictions = []
    all_labels = []

    # Get all predictions and labels
    with torch.no_grad():
        for inputs, masks, labels in data_loader:
            outputs = model(inputs.to(device), masks.to(device))
            preds = torch.max(outputs, dim=1)[1]
            predictions.extend(preds.cpu().numpy())
            all_labels.extend(labels.numpy())

    predictions = np.array(predictions)
    all_labels = np.array(all_labels)
    correct = (predictions == all_labels)

    # Check which original sentences contain the UNK_ID
    contain_UNK = np.array([UNK_ID in sent for sent in plain_data])

    # Calculate accuracy for sentences with UNK tokens
    unk_correct = correct[contain_UNK]
    unk_accuracy = unk_correct.mean() if len(unk_correct) > 0 else 0

    # Calculate accuracy for sentences without UNK tokens
    no_unk_correct = correct[~contain_UNK]
    no_unk_accuracy = no_unk_correct.mean() if len(no_unk_correct) > 0 else 0

    print("--- UNK Token Analysis ---")
    print(f"Accuracy on samples WITH [UNK] tokens:   {unk_accuracy:.4f} ({len(unk_correct)} samples)")
    print(f"Accuracy on samples WITHOUT [UNK] tokens: {no_unk_accuracy:.4f} ({len(no_unk_correct)} samples)")

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report

def model_validation(model):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for input_ids, attention_mask, labels in test_loader:
            input_ids      = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels         = labels.to(device)

            logits = model(input_ids, attention_mask)
            preds = logits.argmax(dim=1).cpu().numpy()

            all_preds.extend(preds)
            all_labels.extend(labels.cpu().numpy())

    print("Accuracy:", round(accuracy_score(all_labels, all_preds), 2))
    print(f"F1 Score: {f1_score(all_labels, all_preds):.2f}")
    print("Precision:", round(precision_score(all_labels, all_preds), 2))
    print("Recall:", round(recall_score(all_labels, all_preds), 2))
    print("-" * 40)
    print(classification_report(all_labels, all_preds, target_names=["neutral", "entails"]))

    print("\n")
    analyze_unk_performance(model, test_loader, plain_test)

In [None]:
def evaluate_all_models(trained_models):
    print("\n" + "="*60)
    print("FINAL MODEL EVALUATION ON TEST SET")
    print("="*60 + "\n")

    for model_name, model in trained_models.items():
        print(f"--- Evaluating {model_name.upper()} Model ---")
        model_validation(model)
        print("\n")

In [None]:
# Evaluate the models that were trained
evaluate_all_models(trained_models)

## Plots and visualizations


In [None]:
# Plot the trainning loss vs the validation loss
def plot_training_histories(histories):
    if not histories:
        print("No histories to plot.")
        return

    for model_name, history in histories.items():
        plt.figure(figsize=(8, 5))
        plt.plot(history['train_loss'], label='Training Loss', color='blue', marker='o')
        plt.plot(history['val_loss'], label='Validation Loss', color='orange', marker='o')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.title(f'Training & Validation Loss for {model_name.upper()} Model')
        plt.legend()
        plt.grid(True)
        plt.show()

In [None]:
plot_training_histories(histories)

### Plot the Attention of the models


In [None]:
def plot_rnn_attention(tokens, attention_weights):
    # 1. Remove padding from tokens for a cleaner plot
    try:
        real_len = tokens.index('[PAD]')
        tokens_no_padding = tokens[:real_len]
        weights_no_padding = attention_weights.squeeze().cpu().numpy()[:real_len]
    except ValueError:
        tokens_no_padding = tokens
        weights_no_padding = attention_weights.squeeze().cpu().numpy()

    # 2. Create the bar plot
    plt.figure(figsize=(12, 6))
    sns.barplot(x=tokens_no_padding, y=weights_no_padding, palette='viridis')
    plt.title('RNN Attention Weights')
    plt.xlabel('Tokens')
    plt.ylabel('Attention Score')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

In [None]:
#Specific Plot for transformer
def plot_attention_head(attention_weights, tokens, head_index):
    """Plots a readable heatmap for a single attention head, ignoring padding."""
    
    # 1. Find the actual length of the sentence by finding the first [PAD] token
    try:
        real_len = tokens.index('[PAD]')
    except ValueError:
        real_len = len(tokens)

    # 2. Slice the tokens and weights to remove padding
    tokens_no_padding = tokens[:real_len]
    head_weights = attention_weights[0, head_index][:real_len, :real_len]
    
    # 3. Create the plot
    fig, ax = plt.subplots(figsize=(10, 8))
    sns.heatmap(
        head_weights, 
        xticklabels=tokens_no_padding, 
        yticklabels=tokens_no_padding, 
        cmap='viridis', 
        ax=ax
    )
    
    ax.set_title(f'Attention Head #{head_index}')
    plt.show()


In [None]:
# Master function to visualize attention
def visualize_attention(model_name, model, sample_token_ids, sample_mask):

    print(f"--- Visualizing Attention for {model_name.upper()} Model ---")
    
    # Convert token IDs back to words for plotting
    plot_tokens = [id2word[int(token_id)] for token_id in sample_token_ids]
    
    model.eval() # Set model to evaluation mode
    
    # Add a batch dimension for model input
    sample_token_ids = sample_token_ids.unsqueeze(0).to(device)
    sample_mask = sample_mask.unsqueeze(0).to(device)

    if model_name in ['gru', 'lstm']:
        # For RNNs, call forward with return_weights=True
        _, attention_weights = model(sample_token_ids, sample_mask, return_weights=True)
        plot_rnn_attention(plot_tokens, attention_weights.detach())
        
    elif model_name == 'transformer':
        # For Transformer, call the special get_attention_weights method
        # First, ensure the method exists
        if not hasattr(model, 'get_attention_weights'):
            print("Error: The Transformer model needs a 'get_attention_weights' method.")
            return
            
        attention_weights = model.get_attention_weights(sample_token_ids, sample_mask)
        
        # Plot the attention for each head
        num_heads = attention_weights.shape[1]
        for i in range(num_heads):
            plot_attention_head(attention_weights.detach().cpu().numpy(), plot_tokens, head_index=i)
    else:
        print(f"Attention visualization for model type '{model_name}' is not supported yet.")

In [None]:
# Select a sample to visualize (e.g., the 5th sentence in the training set)
sample_idx = 5
sample_tokens = x_train[sample_idx]
sample_mask = train_masks[sample_idx]

# Visualize attention for the trained GRU model
if 'gru' in trained_models:
    visualize_attention('gru', trained_models['gru'], sample_tokens, sample_mask)

if 'lstm' in trained_models:
    visualize_attention('lstm', trained_models['lstm'], sample_tokens, sample_mask)

# Visualize attention for the trained Transformer model
if 'transformer' in trained_models:
    visualize_attention('transformer', trained_models['transformer'], sample_tokens, sample_mask)