<a href="https://colab.research.google.com/github/Irfan-Alaam/TransformerBasedProductSentimentAnalyzer/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os, bz2, re, string
import numpy as np
from collections import Counter
import kagglehub
path = kagglehub.dataset_download("bittlingmayer/amazonreviews")
print("Dataset downloaded to:", path)

train_file = os.path.join(path, "train.ft.txt.bz2")
test_file  = os.path.join(path, "test.ft.txt.bz2")

# Text cleaning + tokenizer
def clean_text(text):
    text = text.lower()
    text = text.replace("<br />", " ")
    text = ''.join(ch for ch in text if ch not in string.punctuation)
    return text.split()

# Load FastText format (label + review)
def load_data(file, limit=None):
    reviews, labels = [], []
    with bz2.open(file, "rt", encoding="utf-8") as f:
        for i, line in enumerate(f):
            if limit and i >= limit: break
            parts = line.strip().split(" ", 1)
            if len(parts) != 2: continue
            label, review = parts
            label = 0 if label == "__label__1" else 1
            tokens = clean_text(review)
            reviews.append(tokens)
            labels.append(label)
    return reviews, labels

print("Loading small subset for testing...")
train_reviews, train_labels = load_data(train_file, limit=60000)
test_reviews, test_labels   = load_data(test_file, limit=6000)

# Build Vocabulary
def build_vocab(token_lists, vocab_size=20000):
    counter = Counter()
    for tokens in token_lists:
        counter.update(tokens)
    vocab = {word: i+2 for i, (word, _) in enumerate(counter.most_common(vocab_size-2))}
    vocab["<PAD>"] = 0
    vocab["<UNK>"] = 1
    return vocab
#Here the counter counts how often each word appears in token_lists , picks top 19998 words(2 reserved for <PAD> and<UNK>) assign each word an integer ID starting from 2
vocab = build_vocab(train_reviews, vocab_size=20000)
print("Vocab size:", len(vocab))

# Encode + Pad
def encode_and_pad(tokens, vocab, max_len=100):
    ids = [vocab.get(tok, vocab["<UNK>"]) for tok in tokens]
    if len(ids) < max_len:
        ids += [vocab["<PAD>"]] * (max_len - len(ids))
    else:
        ids = ids[:max_len]
    return ids

X_train = np.array([encode_and_pad(t, vocab) for t in train_reviews], dtype=np.int32)
X_test  = np.array([encode_and_pad(t, vocab) for t in test_reviews], dtype=np.int32)
y_train = np.array(train_labels, dtype=np.int32)
y_test  = np.array(test_labels, dtype=np.int32)

print("Train shape:", X_train.shape, y_train.shape)
print("Test shape :", X_test.shape, y_test.shape)


Downloading from https://www.kaggle.com/api/v1/datasets/download/bittlingmayer/amazonreviews?dataset_version_number=7...


100%|██████████| 493M/493M [00:06<00:00, 79.1MB/s]

Extracting files...





Dataset downloaded to: /root/.cache/kagglehub/datasets/bittlingmayer/amazonreviews/versions/7
Loading small subset for testing...
Vocab size: 20000
Train shape: (60000, 100) (60000,)
Test shape : (6000, 100) (6000,)


In [None]:
def softmax(x):
    if x.ndim == 1:
        x = x - np.max(x)#keeps number stable
        exps = np.exp(x)
        return exps / np.sum(exps)
    else:
        x = x - np.max(x, axis=1, keepdims=True)
        exps = np.exp(x)
        return exps / np.sum(exps, axis=1, keepdims=True)

def layer_norm(x, eps=1e-6):
    mean = np.mean(x, axis=-1, keepdims=True)
    var = np.var(x, axis=-1, keepdims=True)
    return (x - mean) / np.sqrt(var + eps)

def cross_entropy_loss(logits, labels):
    probs = softmax(logits)
    batch_size = labels.shape[0]
    log_likelihood = -np.log(probs[np.arange(batch_size), labels] + 1e-12)
    return np.mean(log_likelihood)

class Adam:
    def __init__(self, params, lr=0.001, betas=(0.9,0.999), eps=1e-8):
        self.params = params
        self.lr = lr
        self.betas = betas
        self.eps = eps
        self.m = [np.zeros_like(p) for p in params]
        self.v = [np.zeros_like(p) for p in params]
        self.t = 0
    def step(self, grads):
        self.t += 1
        b1, b2 = self.betas
        for i, (p, g) in enumerate(zip(self.params, grads)):
            self.m[i] = b1 * self.m[i] + (1-b1)*g
            self.v[i] = b2 * self.v[i] + (1-b2)*(g**2)
            m_hat = self.m[i]/(1-b1**self.t)
            v_hat = self.v[i]/(1-b2**self.t)
            p -= self.lr * m_hat / (np.sqrt(v_hat)+self.eps)


In [None]:
# 1. (Token Embedding)
class Embedding:
    def __init__(self, vocab_size, d_model):
        self.vocab_size = vocab_size
        self.d_model = d_model
        self.weight = np.random.randn(vocab_size, d_model) / np.sqrt(vocab_size)

    def forward(self, x):
        return self.weight[x]


# 2. Positional Encoding (sinusoidal)
def positional_encoding(seq_len, d_model):
    pos = np.arange(seq_len)[:, np.newaxis]
    i = np.arange(d_model)[np.newaxis, :]
    angle_rates = 1 / np.power(10000, (2 * (i//2)) / d_model)
    angle_rads = pos * angle_rates
    pe = np.zeros((seq_len, d_model))
    pe[:, 0::2] = np.sin(angle_rads[:, 0::2])#sin function
    pe[:, 1::2] = np.cos(angle_rads[:, 1::2])#cos function
    return pe

# 3. Scaled Dot-Product Attention
def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    Q,K,V: (batch_size, seq_len, d_k)
    mask: optional, same shape as attention logits
    returns: output, attention_weights
    """
    d_k = Q.shape[-1]
    scores = np.matmul(Q, K.transpose(0,2,1)) / np.sqrt(d_k)  # (batch, seq, seq)

    if mask is not None:
        scores = np.where(mask==0, -1e9, scores)

    attn = softmax(scores)
    output = np.matmul(attn, V)
    return output, attn
# 4. Multi-Head Attention
class MultiHeadAttention:
    def __init__(self, d_model, num_heads):
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        # weights for linear projections
        self.Wq = np.random.randn(d_model, d_model) / np.sqrt(d_model)
        self.Wk = np.random.randn(d_model, d_model) / np.sqrt(d_model)
        self.Wv = np.random.randn(d_model, d_model) / np.sqrt(d_model)
        self.Wo = np.random.randn(d_model, d_model) / np.sqrt(d_model)

    def split_heads(self, x):
        # x: (batch, seq_len, d_model)
        batch_size, seq_len, _ = x.shape
        x = x.reshape(batch_size, seq_len, self.num_heads, self.d_k)
        return x.transpose(0,2,1,3)  # (batch, heads, seq_len, d_k)

    def combine_heads(self, x):
        # x: (batch, heads, seq_len, d_k)
        batch_size, heads, seq_len, d_k = x.shape
        x = x.transpose(0,2,1,3).reshape(batch_size, seq_len, heads*d_k)
        return x

    def forward(self, x):
        # linear projections
        Q = np.matmul(x, self.Wq)
        K = np.matmul(x, self.Wk)
        V = np.matmul(x, self.Wv)

        # split into heads
        Q = self.split_heads(Q)
        K = self.split_heads(K)
        V = self.split_heads(V)

        # compute attention per head
        batch_size, heads, seq_len, d_k = Q.shape
        out_heads = []
        for h in range(heads):
            out, _ = scaled_dot_product_attention(Q[:,h,:,:], K[:,h,:,:], V[:,h,:,:])
            out_heads.append(out)
        out_heads = np.stack(out_heads, axis=1)

        # combine heads
        out = self.combine_heads(out_heads)
        out = np.matmul(out, self.Wo)
        return out

# 5. Feedforward Network
class FeedForward:
    def __init__(self, d_model, d_ff):
        self.W1 = np.random.randn(d_model, d_ff) / np.sqrt(d_model)
        self.b1 = np.zeros(d_ff)
        self.W2 = np.random.randn(d_ff, d_model) / np.sqrt(d_ff)
        self.b2 = np.zeros(d_model)

    def forward(self, x):
        x = np.matmul(x, self.W1) + self.b1
        x = np.maximum(0, x)  # ReLU
        x = np.matmul(x, self.W2) + self.b2
        return x

# 6. Transformer Encoder Block
class EncoderLayer:
    def __init__(self, d_model, num_heads, d_ff):
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForward(d_model, d_ff)

    def forward(self, x):
        # Multi-head Attention + Residual + LayerNorm(Shortcut connection)
        attn_out = self.mha.forward(x)
        x = layer_norm(x + attn_out)

        # Feedforward + Residual + LayerNorm(Shortcut connection)
        ffn_out = self.ffn.forward(x)
        x = layer_norm(x + ffn_out)
        return x


In [None]:

# 1. Transformer Encoder (stacked layers)
class TransformerEncoder:
    def __init__(self, vocab_size, seq_len, d_model=64, num_heads=4, d_ff=128, num_layers=2, num_classes=2):
        self.vocab_size = vocab_size
        self.seq_len = seq_len
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_ff = d_ff
        self.num_layers = num_layers
        self.num_classes = num_classes

        # Embedding + Positional Encoding
        self.embedding = Embedding(vocab_size, d_model)
        self.pos_encoding = positional_encoding(seq_len, d_model)

        # Stack of Encoder layers
        self.enc_layers = [EncoderLayer(d_model, num_heads, d_ff) for _ in range(num_layers)]

        # Final classifier
        self.W_out = np.random.randn(d_model, num_classes) / np.sqrt(d_model)
        self.b_out = np.zeros(num_classes)

    def forward(self, x):
        """
        x: (batch_size, seq_len) token indices
        returns: logits (batch_size, num_classes)
        """
        # Embedding + add positional encoding
        x = self.embedding.forward(x)
        x += self.pos_encoding[np.newaxis, :, :]

        # Pass through N encoder layers
        for layer in self.enc_layers:
            x = layer.forward(x)

        # Pooling
        x = np.mean(x, axis=1)  # (batch, d_model)

        # Final classifier
        logits = np.matmul(x, self.W_out) + self.b_out
        return logits

vocab_size = len(vocab)
seq_len = 100
d_model = 64
num_heads = 4
d_ff = 128
num_layers = 2
num_classes = 2#number of output classes i.e 0=negative, 1=positive

model = TransformerEncoder(vocab_size, seq_len, d_model, num_heads, d_ff, num_layers, num_classes)

# Forward pass example
batch_input = X_train[:8]
logits = model.forward(batch_input)
probs = softmax(logits)
print("Logits shape:", logits.shape)
print("Probabilities shape:", probs.shape)
print("Sample probabilities:", probs)



Logits shape: (8, 2)
Probabilities shape: (8, 2)
Sample probabilities: [[0.61638124 0.38361876]
 [0.61638537 0.38361463]
 [0.61549842 0.38450158]
 [0.61563032 0.38436968]
 [0.61621955 0.38378045]
 [0.61590473 0.38409527]
 [0.61563877 0.38436123]
 [0.61528692 0.38471308]]


In [None]:
# 1. Adam Optimizer for all parameters
# Collect all trainable parameters
def predict(model, x):
    logits = model.forward(x)
    probs = softmax(logits)
    preds = np.argmax(probs, axis=1)
    return preds, probs
params = [model.embedding.weight, model.W_out, model.b_out]

# Include encoder layer params
for layer in model.enc_layers:
    mha = layer.mha
    params += [mha.Wq, mha.Wk, mha.Wv, mha.Wo]
    ffn = layer.ffn
    params += [ffn.W1, ffn.b1, ffn.W2, ffn.b2]
#Adam optimizer updates all paramaters in one call
optimizer = Adam(params, lr=0.001)

# 2. Helper functions
def relu_grad(x):
    return (x > 0).astype(np.float32)

# 3. Compute loss + gradients
def compute_loss_and_grads_full(model, x_batch, y_batch):
    """
    Forward + Backward pass including embeddings, encoder layers, and classifier.
    For simplicity, we approximate gradients for encoder layers via identity (skip full MHA backprop).
    """

    batch, seq_len = x_batch.shape
    d_model = model.d_model

    # ----------------- Forward pass -----------------
    # Embedding + Positional encoding
    x = model.embedding.forward(x_batch)  # (batch, seq_len, d_model)
    x += model.pos_encoding[np.newaxis, :, :]

    activations = [x]  # store input to each layer
    for layer in model.enc_layers:
        x = layer.forward(x)
        activations.append(x)

    # Pooling
    pooled = np.mean(x, axis=1)  # (batch, d_model)

    # Classifier
    logits = np.matmul(pooled, model.W_out) + model.b_out
    loss = cross_entropy_loss(logits, y_batch)

    # ----------------- Backward pass -----------------
    # Gradient w.r.t logits
    probs = softmax(logits)
    probs[np.arange(batch), y_batch] -= 1
    probs /= batch

    # Gradients for classifier
    dW_out = np.matmul(pooled.T, probs)
    db_out = np.sum(probs, axis=0)

    # Gradient for pooled embeddings
    dpooled = np.matmul(probs, model.W_out.T)  # (batch, d_model)
    demb_seq = np.tile(dpooled[:, np.newaxis, :], (1, seq_len, 1)) / seq_len  # mean pooling

    # ----------------- Backprop through last encoder layer FFN -----------------
    layer = model.enc_layers[-1]
    ffn = layer.ffn

    # Flatten for matmul
    x_input = activations[-2]  # input to this layer (batch, seq_len, d_model)
    hidden = np.matmul(x_input, ffn.W1) + ffn.b1  # (batch, seq_len, d_ff)
    hidden_flat = hidden.reshape(-1, ffn.W1.shape[1])  # (batch*seq_len, d_ff)
    dx_flat = demb_seq.reshape(-1, d_model)  # (batch*seq_len, d_model)

    # ReLU mask
    relu_mask = (hidden_flat > 0)

    # Gradients for W2, b2
    dW2 = np.matmul(hidden_flat.T, dx_flat)
    db2 = np.sum(dx_flat, axis=0)

    # Gradient through ReLU and W1
    dhidden = np.matmul(dx_flat, ffn.W2.T) * relu_mask
    dW1 = np.matmul(x_input.reshape(-1, d_model).T, dhidden)
    db1 = np.sum(dhidden, axis=0)

    # Gradient for embedding (approx: backprop only through FFN of last layer)
    dx_ffn = np.matmul(dhidden, ffn.W1.T)  # (batch*seq_len, d_model)
    demb = dx_ffn.reshape(batch, seq_len, d_model) + demb_seq  # add pooled gradient

    # Gradient w.r.t embedding weights
    dEmbedding = np.zeros_like(model.embedding.weight)
    for i in range(batch):
        for j in range(seq_len):
            token_idx = x_batch[i, j]
            dEmbedding[token_idx] += demb[i, j]

    grads = [dEmbedding, dW_out, db_out]

    return loss, grads

# 4. Training Loop
batch_size = 32
epochs = 3

for epoch in range(epochs):
    perm = np.random.permutation(len(X_train))
    X_train_shuffled = X_train[perm]
    y_train_shuffled = y_train[perm]

    epoch_loss = 0
    correct = 0

    for i in range(0, len(X_train), batch_size):
        x_batch = X_train_shuffled[i:i+batch_size]
        y_batch = y_train_shuffled[i:i+batch_size]

        loss, grads = compute_loss_and_grads_full(model, x_batch, y_batch)
        optimizer.step(grads)
        epoch_loss += loss * len(x_batch)

        # Accuracy
        preds, _ = predict(model, x_batch)
        correct += np.sum(preds == y_batch)

    epoch_loss /= len(X_train)
    accuracy = correct / len(X_train)
    print(f"Epoch {epoch+1}/{epochs} - Loss: {epoch_loss:.4f} - Accuracy: {accuracy:.4f}")

Epoch 1/3 - Loss: 0.5091 - Accuracy: 0.7665
Epoch 2/3 - Loss: 0.2959 - Accuracy: 0.8905
Epoch 3/3 - Loss: 0.2456 - Accuracy: 0.9086


In [None]:
import pickle

def save_checkpoint(model, vocab, path="transformer_checkpoint.pkl"):
    checkpoint = {
        "model_params": {
            "embedding_weight": model.embedding.weight,
            "enc_Wq": [layer.mha.Wq for layer in model.enc_layers],
            "enc_Wk": [layer.mha.Wk for layer in model.enc_layers],
            "enc_Wv": [layer.mha.Wv for layer in model.enc_layers],
            "enc_Wo": [layer.mha.Wo for layer in model.enc_layers],
            "enc_ffn_W1": [layer.ffn.W1 for layer in model.enc_layers],
            "enc_ffn_b1": [layer.ffn.b1 for layer in model.enc_layers],
            "enc_ffn_W2": [layer.ffn.W2 for layer in model.enc_layers],
            "enc_ffn_b2": [layer.ffn.b2 for layer in model.enc_layers],
            "W_out": model.W_out,
            "b_out": model.b_out,
            "vocab_size": model.vocab_size,
            "seq_len": model.seq_len,
            "d_model": model.d_model,
            "num_heads": model.num_heads,
            "d_ff": model.d_ff,
            "num_layers": model.num_layers,
            "num_classes": model.num_classes
        },
        "vocab": vocab
    }

    with open(path, "wb") as f:
        pickle.dump(checkpoint, f)
    print(f"Checkpoint saved to {path}")

# Save
save_checkpoint(model, vocab)


all_preds = []
correct = 0
for i in range(0, len(X_test), batch_size):
    x_batch = X_test[i:i+batch_size]
    y_batch = y_test[i:i+batch_size]
    preds, _ = predict(model, x_batch)
    all_preds.extend(preds)
    correct += np.sum(preds == y_batch)

accuracy = correct / len(X_test)
print(f"Test Accuracy: {accuracy:.4f}")


Checkpoint saved to transformer_checkpoint.pkl
Test Accuracy: 0.8845


In [None]:
# ------------------- Phase 6 -------------------
import matplotlib.pyplot as plt

def compare_attention(model, pos_tokens, neg_tokens, vocab, max_len=100):
    def get_attn_weights(tokens):
        x = np.array([[vocab.get(tok, vocab["<UNK>"]) for tok in tokens] +
                      [vocab["<PAD>"]] * (max_len - len(tokens))])
        x_emb = model.embedding.forward(x) + model.pos_encoding[np.newaxis,:,:]

        enc_layer = model.enc_layers[-1]
        mha = enc_layer.mha

        Q = np.matmul(x_emb, mha.Wq)
        K = np.matmul(x_emb, mha.Wk)
        V = np.matmul(x_emb, mha.Wv)

        batch, seq_len, _ = Q.shape
        Qh = Q.reshape(batch, seq_len, mha.num_heads, mha.d_k).transpose(0,2,1,3)
        Kh = K.reshape(batch, seq_len, mha.num_heads, mha.d_k).transpose(0,2,1,3)
        Vh = V.reshape(batch, seq_len, mha.num_heads, mha.d_k).transpose(0,2,1,3)

        attn_weights = []
        for h in range(mha.num_heads):
            scores = np.matmul(Qh[:,h,:,:], Kh[:,h,:,:].transpose(0,2,1)) / np.sqrt(mha.d_k)
            attn_weights.append(softmax(scores[0]))
        return attn_weights, min(len(tokens), max_len)

    pos_attn, pos_len = get_attn_weights(pos_tokens)
    neg_attn, neg_len = get_attn_weights(neg_tokens)

    for h in range(len(pos_attn)):
        fig, axes = plt.subplots(1,2, figsize=(15,6))
        axes[0].imshow(pos_attn[h][:pos_len, :pos_len], cmap='viridis')
        axes[0].set_xticks(range(pos_len))
        axes[0].set_yticks(range(pos_len))
        axes[0].set_xticklabels(pos_tokens[:pos_len], rotation=90)
        axes[0].set_yticklabels(pos_tokens[:pos_len])
        axes[0].set_title(f"Head {h+1} Positive")

        axes[1].imshow(neg_attn[h][:neg_len, :neg_len], cmap='viridis')
        axes[1].set_xticks(range(neg_len))
        axes[1].set_yticks(range(neg_len))
        axes[1].set_xticklabels(neg_tokens[:neg_len], rotation=90)
        axes[1].set_yticklabels(neg_tokens[:neg_len])
        axes[1].set_title(f"Head {h+1} Negative")
        plt.show()

# Example
positive_review = "this product is disgustingly good".split()
negative_review = "the device works absolutely shit".split()
compare_attention(model, positive_review, negative_review, vocab)


In [None]:
import numpy as np

def confusion_matrix(y_true, y_pred):
    """
    Returns confusion matrix for binary classification
    [[TP, FN],
     [FP, TN]]
    """
    TP = np.sum((y_true == 1) & (y_pred == 1))
    TN = np.sum((y_true == 0) & (y_pred == 0))
    FP = np.sum((y_true == 0) & (y_pred == 1))
    FN = np.sum((y_true == 1) & (y_pred == 0))
    return np.array([[TP, FN],
                     [FP, TN]])
def classification_metrics(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    TP, FN = cm[0]
    FP, TN = cm[1]

    accuracy  = (TP + TN) / (TP + TN + FP + FN)
    precision = TP / (TP + FP + 1e-12)  # avoid division by zero
    recall    = TP / (TP + FN + 1e-12)
    f1        = 2 * (precision * recall) / (precision + recall + 1e-12)

    return cm, accuracy, precision, recall, f1
all_preds = np.array(all_preds)  # from Phase 5
y_true = y_test

cm, accuracy, precision, recall, f1 = classification_metrics(y_true, all_preds)

print("Confusion Matrix:\n", cm)
print(f"Accuracy : {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall   : {recall:.4f}")
print(f"F1-score : {f1:.4f}")
import matplotlib.pyplot as plt

plt.figure(figsize=(5,4))
plt.imshow(cm, cmap='Blues')
plt.title("Confusion Matrix")
plt.xlabel("Predicted")
plt.ylabel("True")
plt.xticks([0,1], ["Positive","Negative"])
plt.yticks([0,1], ["Positive","Negative"])
for i in range(2):
    for j in range(2):
        plt.text(j, i, cm[i,j], ha='center', va='center', color='red')
plt.show()
