In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from datasets import load_dataset
from torch.utils.data import DataLoader, Dataset
import math



  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class BaseAttention(nn.Module):
    def __init__(self):
        super().__init__()
    
    def forward(self, q, k, v, mask=None):
        d_k = q.size(-1)
        attn = (q @ k.transpose(-2, -1)) * (d_k**-0.5)
        if mask is not None:
            attn = attn.masked_fill(mask==0, float('-inf'))
        attn = F.softmax(attn, dim=-1)
        output = attn @ v

        return output, attn

        


In [3]:
class MultiHeadAttention(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout=0.1):
        super().__init__()
        assert emb_dim % n_heads == 0
        self.d_k = emb_dim // n_heads
        self.emb_dim = emb_dim
        self.n_heads = n_heads

        self.wq = nn.Linear(emb_dim, emb_dim)
        self.wk = nn.Linear(emb_dim, emb_dim)
        self.wv = nn.Linear(emb_dim, emb_dim)
        self.wo = nn.Linear(emb_dim, emb_dim)

        self.attn = BaseAttention()
        self.dropout = nn.Dropout(dropout)

    def forward(self, q, k, v, mask=None):
        batch_size = q.size(0)

        Q = self.wq(q)
        K = self.wk(k)
        V = self.wv(v)

        Q = Q.view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)

        out, attn_weights = self.attn(Q, K, V, mask)

        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.n_heads*self.d_k)

        out = self.wo(out)

        return out, attn_weights


In [4]:
class TBlock(nn.Module):
    def __init__(self, emb_dim, n_heads, dropout):
        super().__init__()
        self.emb_dim = emb_dim
        self.n_heads = n_heads

        self.mha = MultiHeadAttention(emb_dim, n_heads)
        self.ln1 = nn.LayerNorm(emb_dim)
        self.ln2 = nn.LayerNorm(emb_dim)
        self.ff = nn.Sequential(
            nn.Linear(emb_dim, emb_dim*2),
            nn.GELU(),
            nn.Linear(emb_dim*2, emb_dim)
        )
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        attn_out, _ = self.mha(x, x, x, mask)
        x = self.ln1(x + self.dropout(attn_out))
        ff_out = self.ff(x)
        x = self.ln2(x + self.dropout(ff_out))
        return x

In [5]:
class p_enc(nn.Module):
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        pos = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div = torch.exp(torch.arange(0, d_model, 2).float()*(-math.log(10000)/d_model))

        pe[:,0::2] = torch.sin(pos*div)
        pe[:,1::2] = torch.cos(pos*div)

        pe = pe.unsqueeze(0)
        self.register_buffer("pe", pe)

    def forward(self, x):
        seq_len = x.size(1)
        x = x + self.pe[:, :seq_len]
        return self.dropout(x)

In [6]:
class T_Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, n_layers, dropout=0.1):
        super().__init__()
        self.d_model = d_model
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = p_enc(d_model, dropout=dropout)

        self.layers = nn.ModuleList([
            TBlock(d_model, n_heads, dropout)
            for _ in range(n_layers)
        ])
        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        x = self.embedding(x) * math.sqrt(self.d_model)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)


In [7]:
class SentimentClassifier(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, num_classes=2, max_len=512, dropout=0.2):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = p_enc(d_model, max_len=max_len, dropout=dropout)

        self.layers = nn.ModuleList([
            TBlock(d_model, num_heads, dropout=dropout)
            for _ in range(num_layers)
        ])
        self.norm = nn.LayerNorm(d_model)
        self.classifier = nn.Linear(d_model, num_classes)

    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for layer in self.layers:
            x = layer(x, mask)
        x = self.norm(x)
        x = x.mean(dim=1) 
        return self.classifier(x)


In [8]:
imdb = load_dataset('imdb')

In [9]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
MAX_LEN = 256

def tokenize(batch):
    tokens = tokenizer(
        batch["text"], 
        padding="max_length", 
        truncation=True, 
        max_length=MAX_LEN, 
        return_tensors="pt"
    )
    batch["input_ids"] = tokens["input_ids"]
    batch["attention_mask"] = tokens["attention_mask"]
    return batch

imdb = imdb.map(tokenize, batched=True)

Map: 100%|██████████| 50000/50000 [00:28<00:00, 1760.38 examples/s]
Map: 100%|██████████| 50000/50000 [00:28<00:00, 1760.38 examples/s]


In [10]:
class HuggingFaceDataset(Dataset):
    def __init__(self, hf_dataset):
        self.dataset = hf_dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self.dataset[idx]

In [11]:
def collate_fn(batch):
    # Convert each element to torch.tensor
    input_ids = torch.tensor([item["input_ids"] for item in batch])
    attention_mask = torch.tensor([item["attention_mask"] for item in batch])
    labels = torch.tensor([item["label"] for item in batch])
    return input_ids, attention_mask, labels

train_ds = HuggingFaceDataset(imdb["train"])
test_ds = HuggingFaceDataset(imdb["test"])

train_loader = DataLoader(imdb["train"], batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(imdb["test"], batch_size=32, shuffle=False, collate_fn=collate_fn)

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'using device: {device}')
model = T_Encoder(vocab_size=tokenizer.vocab_size, d_model=128, n_heads=8, n_layers=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

using device: cuda


In [17]:
from tqdm import tqdm

for epoch in range(3):  # small for demo
    model.train()
    total_loss = 0
    for input_ids, mask, labels in tqdm(train_loader):
        input_ids, mask, labels = input_ids.to(device), mask.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(input_ids, mask.unsqueeze(1).unsqueeze(2))
        seq_out = outputs.mean(dim=1)
        loss = criterion(seq_out, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"Epoch {epoch+1}, Loss: {total_loss/len(train_loader):.4f}")


100%|██████████| 782/782 [01:06<00:00, 11.71it/s]
100%|██████████| 782/782 [01:06<00:00, 11.71it/s]


Epoch 1, Loss: 0.9397


100%|██████████| 782/782 [01:09<00:00, 11.26it/s]
100%|██████████| 782/782 [01:09<00:00, 11.26it/s]


Epoch 2, Loss: 0.5768


100%|██████████| 782/782 [01:04<00:00, 12.21it/s]

Epoch 3, Loss: 0.5086





In [None]:
import random

sample_indices = random.sample(range(len(test_ds)), 5)
samples = [test_ds[i] for i in sample_indices]

input_ids = torch.tensor([s["input_ids"] for s in samples]).to(device)
mask = torch.tensor([s["attention_mask"] for s in samples]).to(device)
labels = torch.tensor([s["label"] for s in samples]).to(device)


In [19]:
model.eval()
with torch.no_grad():
    outputs, attn_weights = model(input_ids, mask)
    preds = torch.argmax(outputs, dim=1)

for i, s in enumerate(samples):
    print(f"Review: {s['text'][:200]}...")  # print first 200 chars
    print(f"True label: {labels[i].item()}, Predicted: {preds[i].item()}")
    print("-"*80)


RuntimeError: The size of tensor a (5) must match the size of tensor b (256) at non-singleton dimension 2