In [4]:
import pandas as pd
import numpy as np
import gensim
import torch
import torch.nn as nn
import torch.nn.utils.rnn
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from gensim.models import KeyedVectors

In [3]:
!pip install gensim

Collecting gensim
  Downloading gensim-4.4.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (8.4 kB)
Downloading gensim-4.4.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (27.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m27.9/27.9 MB[0m [31m85.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: gensim
Successfully installed gensim-4.4.0


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
wv = KeyedVectors.load("/content/drive/MyDrive/Transformer-Model/word2vec-256-dim.kv")

df = pd.read_pickle("/content/drive/MyDrive/Transformer-Model/train_clean_tokens_and_labels_act.pkl")
df_val = pd.read_pickle("/content/drive/MyDrive/Transformer-Model/val_clean_tokens_and_labels_act.pkl")
df_test = pd.read_pickle("/content/drive/MyDrive/Transformer-Model/test_clean_tokens_and_labels_act.pkl")

# Recalculate max_len to consider all datasets (train, val, test) and add a buffer
max_len_train = max(len(s) for s in df["sentence"])
max_len_val = max(len(s) for s in df_val["sentence"])
max_len_test = max(len(s) for s in df_test["sentence"])
max_len = max(max_len_train, max_len_val, max_len_test) + 10 # Added a buffer of 10

df["act"] = df["act"] - 1
df_val["act"] = df_val["act"] - 1
df_test["act"] = df_test["act"] - 1

num_classes = df["act"].nunique()

alpha = 0.75
class_counts = df["act"].value_counts().sort_index().values
weights = torch.tensor((1.0 / class_counts)**alpha, dtype=torch.float32)
weights = weights / weights.sum()

In [None]:
num_classes

4

In [None]:
print("Train unique:", sorted(df["act"].unique()))
print("Val unique:", sorted(df_val["act"].unique()))
print("Test unique:", sorted(df_test["act"].unique()))

print("Train min/max:", df["act"].min(), df["act"].max())
print("Val min/max:", df_val["act"].min(), df_val["act"].max())
print("Test min/max:", df_test["act"].min(), df_test["act"].max())

print("Any NaN in train:", df["act"].isna().any())
print("Any NaN in val:", df_val["act"].isna().any())
print("Any NaN in test:", df_test["act"].isna().any())

Train unique: [np.int64(0), np.int64(1), np.int64(2), np.int64(3)]
Val unique: [np.int64(0), np.int64(1), np.int64(2), np.int64(3)]
Test unique: [np.int64(0), np.int64(1), np.int64(2), np.int64(3)]
Train min/max: 0 3
Val min/max: 0 3
Test min/max: 0 3
Any NaN in train: False
Any NaN in val: False
Any NaN in test: False


In [None]:
class SentenceDataset(torch.utils.data.Dataset):
    def __init__(self, df, wv):
        self.sentences = df["sentence"].tolist()
        self.labels = df['act'].tolist()
        self.wv = wv
        self.embedding_dim = wv.vector_size

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = self.sentences[idx]

        vectors = []
        for token in tokens:
            if token in self.wv:
                vectors.append(torch.tensor(self.wv[token], dtype=torch.float32))

        if len(vectors) == 0:
            vectors.append(torch.zeros(self.embedding_dim))

        sentence_tensor = torch.stack(vectors)   # (L, d_model)
        label = int(self.labels[idx])

        return sentence_tensor, label

In [None]:
def collate_fn(batch):
    # batch = [(sentence_tensor, label), ...]

    sentences, labels = zip(*batch)  # unzip

    lengths = torch.tensor([x.shape[0] for x in sentences])  # (B,)

    # Pad sentence tensors
    padded = pad_sequence(sentences, batch_first=True).float()  # (B, L, d_model)

    # Create attention mask
    max_len = padded.shape[1]
    mask = torch.arange(max_len, device=padded.device).expand(len(sentences), max_len)
    mask = mask >= lengths.unsqueeze(1)
    mask = mask.unsqueeze(1).unsqueeze(2)  # (B,1,1,L)
    mask = mask.float() * -1e9


    labels = torch.tensor(labels, dtype=torch.long)

    return padded, mask, labels

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, max_len, d_model):
        super().__init__()

        PE = np.zeros((max_len, d_model))

        for pos in range(max_len):
            for i in range(d_model // 2):
                PE[pos, 2*i] = np.sin(pos / (10000 ** (2*i / d_model)))
                PE[pos, 2*i + 1] = np.cos(pos / (10000 ** (2*i / d_model)))

        # convert to tensor
        PE = torch.tensor(PE, dtype=torch.float32)

        # register as buffer (not parameter)
        self.register_buffer("PE", PE)

    def forward(self, x):

        # shape of x is (B, L, d_model)
        L = x.size(1)
        return x + self.PE[:L]

In [None]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        self.W_Q = nn.Linear(d_model, d_model, bias=False)
        self.W_K = nn.Linear(d_model, d_model, bias=False)
        self.W_V = nn.Linear(d_model, d_model, bias=False)
        self.W_O = nn.Linear(d_model, d_model, bias=False)

    def forward(self, x, mask=None):
        # x has a shape of (batch, sequence_length, model_dimension)
        B = x.shape[0]
        L = x.shape[1]

        Q = self.W_Q(x) # all are of the shape (B, L, d_model)
        K = self.W_K(x)
        V = self.W_V(x)


        # creating multiple heads from these single heads, ie. Q, K, V by
        # reshaping the shape to (B, L, num_heads, head_dim) where num_heads*head_dims
        # is equal to the d_model.
        Q = Q.reshape(B, L, self.num_heads, self.head_dim)
        K = K.reshape(B, L, self.num_heads, self.head_dim)
        V = V.reshape(B, L, self.num_heads, self.head_dim)

        # rearranging the dimensions to (B, num_heads, L, head_dim)
        Q = Q.permute(0, 2, 1, 3)
        K = K.permute(0, 2, 1, 3)
        V = V.permute(0, 2, 1, 3)

        # calculating the scaled attention scores
        scores = Q @ K.transpose(-2, -1) # dim = (B, num_heads, L, L)
        scores = scores / (self.head_dim**0.5)
        if mask is not None:
            scores = scores + mask
        attention_weights = torch.softmax(scores, dim=-1) #dim = (B, num_heads, L, L)
        out = attention_weights @ V

        # making the output dimensions equal to the input dimensions.
        out = out.permute(0, 2, 1, 3)
        out = out.reshape(B, L, self.d_model)

        # applying the output projection
        out = self.W_O(out)

        return out

In [None]:
class FeedForward(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(d_model, 4*d_model), # Changed from 2*d_model to 4*d_model for standard Transformer FFN
            nn.ReLU(),
            nn.Linear(4*d_model, d_model)  # Changed from 2*d_model to 4*d_model for standard Transformer FFN
        )

    def forward(self, x):
        out = self.network(x)
        return out

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()

        self.attention = MultiHeadSelfAttention(d_model, num_heads)
        self.feed_forward = FeedForward(d_model)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):

        attn_out = self.attention(x, mask)

        x = self.layer_norm1(attn_out + x)

        ffn_out = self.feed_forward(x)

        x = self.layer_norm2(ffn_out + x)

        return x

In [None]:
class Encoder(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, max_len):
        super().__init__()
        self.num_layers = num_layers
        self.positional_encoding = PositionalEncoding(max_len, d_model)
        self.blocks = nn.ModuleList([EncoderBlock(d_model, num_heads) for i in range(num_layers)])

    def forward(self, x, mask=None):
        x = self.positional_encoding(x)

        for block in self.blocks:
            x = block(x, mask)
        return x

In [None]:
class SentenceActModel(nn.Module):
    def __init__(self, d_model, num_heads, num_layers, num_classes, max_len):
        super().__init__()

        # Encoder backbone
        self.encoder = Encoder(d_model, num_heads, num_layers, max_len=max_len)

        # Classification head
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x, mask):
        """
        x:    (B, L, d_model)
        mask: (B, 1, 1, L)
        """

        # Step 1: Encoder
        encoded = self.encoder(x, mask)  # (B, L, d_model)

        # Step 2: Prepare mask for pooling
        # mask == 0 → valid tokens
        valid_mask = (mask == 0).squeeze(1).squeeze(1)  # (B, L)

        # Step 3: Zero-out padding embeddings
        valid_mask = valid_mask.unsqueeze(-1)           # (B, L, 1)
        encoded = encoded * valid_mask                   # (B, L, d_model)

        # Step 4: Sum over tokens
        summed = encoded.sum(dim=1)                      # (B, d_model)

        # Step 5: Count real tokens
        lengths = valid_mask.sum(dim=1)                  # (B, 1)
        lengths = lengths.clamp(min=1)                   # avoid divide-by-zero

        # Step 6: Mean pooling
        sentence_embedding = summed / lengths            # (B, d_model)

        # Step 7: Classification
        logits = self.classifier(sentence_embedding)     # (B, num_classes)

        return logits

In [None]:
dataset = SentenceDataset(df, wv)
loader = DataLoader(dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)

In [None]:
val_dataset = SentenceDataset(df_val, wv)
test_dataset = SentenceDataset(df_test, wv)

val_loader = DataLoader(val_dataset,batch_size=16,shuffle=False,collate_fn=collate_fn)

test_loader = DataLoader(test_dataset,batch_size=16,shuffle=False,collate_fn=collate_fn)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

model = SentenceActModel(
    d_model=256,
    num_heads=8,
    num_layers=6,
    num_classes=num_classes,
    max_len=max_len
).to(device)

criterion = nn.CrossEntropyLoss(weight=weights.to(device))
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

Using device: cuda


In [None]:
from sklearn.metrics import accuracy_score, f1_score

def evaluate(model, loader, criterion, device):
    model.eval()

    total_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for padded_batch, mask, labels in loader:
            padded_batch = padded_batch.to(device)
            mask = mask.to(device)
            labels = labels.to(device)

            logits = model(padded_batch, mask)
            loss = criterion(logits, labels)
            total_loss += loss.item()

            preds = torch.argmax(logits, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(loader)
    acc = accuracy_score(all_labels, all_preds)
    macro_f1 = f1_score(all_labels, all_preds, average="macro")

    return avg_loss, acc, macro_f1

In [None]:
from sklearn.metrics import confusion_matrix, classification_report

num_epochs = 10

# =========================
# TRAINING + VALIDATION
# =========================
for epoch in range(num_epochs):

    model.train()
    total_loss = 0.0

    for step, (padded_batch, mask, labels) in enumerate(loader, start=1):
        padded_batch = padded_batch.to(device)
        mask = mask.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        logits = model(padded_batch, mask)
        loss = criterion(logits, labels)

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item()

    avg_train_loss = total_loss / len(loader)

    # ---------- VALIDATION ----------
    val_loss, val_acc, val_f1 = evaluate(
        model, val_loader, criterion, device
    )

    print(
        f"Epoch [{epoch+1}/{num_epochs}] | "
        f"Train Loss: {avg_train_loss:.4f} | "
        f"Val Loss: {val_loss:.4f} | "
        f"Val Acc: {val_acc:.4f} | "
        f"Val Macro F1: {val_f1:.4f}"
    )

# =========================
# FINAL TEST (RUN ONCE)
# =========================
test_loss, test_acc, test_f1 = evaluate(
    model, test_loader, criterion, device
)

print("\nFINAL TEST RESULTS")
print(f"Test Loss     : {test_loss:.4f}")
print(f"Test Accuracy : {test_acc:.4f}")
print(f"Test Macro F1 : {test_f1:.4f}")

# =========================
# CONFUSION MATRIX (TEST)
# =========================
model.eval()
y_true, y_pred = [], []

with torch.no_grad():
    for padded_batch, mask, labels in test_loader:
        padded_batch = padded_batch.to(device)
        mask = mask.to(device)

        logits = model(padded_batch, mask)
        preds = torch.argmax(logits, dim=1)

        y_pred.extend(preds.cpu().numpy())
        y_true.extend(labels.numpy())

print("\nConfusion Matrix:")
print(confusion_matrix(y_true, y_pred))

print("\nClassification Report:")
print(classification_report(y_true, y_pred))

Epoch [1/10] | Train Loss: 0.7670 | Val Loss: 0.7359 | Val Acc: 0.7356 | Val Macro F1: 0.6754
Epoch [2/10] | Train Loss: 0.6606 | Val Loss: 0.7044 | Val Acc: 0.7068 | Val Macro F1: 0.6774
Epoch [3/10] | Train Loss: 0.6215 | Val Loss: 0.6515 | Val Acc: 0.7535 | Val Macro F1: 0.7148
Epoch [4/10] | Train Loss: 0.5845 | Val Loss: 0.6571 | Val Acc: 0.7679 | Val Macro F1: 0.7201
Epoch [5/10] | Train Loss: 0.5404 | Val Loss: 0.6597 | Val Acc: 0.7551 | Val Macro F1: 0.7216
Epoch [6/10] | Train Loss: 0.4886 | Val Loss: 0.6610 | Val Acc: 0.7568 | Val Macro F1: 0.7228
Epoch [7/10] | Train Loss: 0.4301 | Val Loss: 0.7467 | Val Acc: 0.7795 | Val Macro F1: 0.7300
Epoch [8/10] | Train Loss: 0.3758 | Val Loss: 0.8269 | Val Acc: 0.7713 | Val Macro F1: 0.7280
Epoch [9/10] | Train Loss: 0.3258 | Val Loss: 0.9420 | Val Acc: 0.7659 | Val Macro F1: 0.7270
Epoch [10/10] | Train Loss: 0.2888 | Val Loss: 1.1618 | Val Acc: 0.7616 | Val Macro F1: 0.7140

FINAL TEST RESULTS
Test Loss     : 1.0047
Test Accuracy : 

In [None]:


import nltk
from nltk.tokenize import wordpunct_tokenize
import torch.nn.functional as F

In [None]:
def predict_act(sentence, model, wv, device, max_len):
    """
    sentence: str
    returns: predicted act id (int)
    """

    model.eval()

    # 1. tokenize
    tokens = wordpunct_tokenize(sentence.lower())

    # 2. word2vec lookup
    vectors = []
    for token in tokens:
        if token in wv:
            vectors.append(torch.tensor(wv[token], dtype=torch.float32))

    if len(vectors) == 0:
        vectors.append(torch.zeros(wv.vector_size))

    sentence_tensor = torch.stack(vectors)  # (L, d_model)

    # 3. add batch dimension
    sentence_tensor = sentence_tensor.unsqueeze(0).to(device)  # (1, L, d_model)

    # 4. create mask
    length = sentence_tensor.size(1)
    mask = torch.zeros(1, 1, 1, length, device=device)  # no padding → all valid

    # 5. forward pass
    with torch.no_grad():
        logits = model(sentence_tensor, mask)
        probs = F.softmax(logits, dim=1)

    pred_class = torch.argmax(probs, dim=1).item()

    return pred_class, probs.squeeze().cpu().numpy()

In [None]:
def predict_act_with_threshold(
    sentence,
    model,
    wv,
    device,
    max_len,
    threshold=0.45
):
    """
    Returns:
    - act class id OR 'UNCERTAIN'
    - probability vector
    """

    pred, probs = predict_act(sentence, model, wv, device, max_len)

    confidence = probs[pred]

    if confidence < threshold:
        return "UNCERTAIN", probs

    return pred, probs

In [None]:
sentence =  "Oh , sorry to hear that . This is quite unusual . I will look into the matter ."

In [None]:
pred, probs = predict_act(
    sentence,
    model,
    wv,
    device,
    max_len
)

print("Prediction:", pred)
print("Probabilities:", probs)

RecursionError: maximum recursion depth exceeded