In [11]:
import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import dataset

import numpy as np
import matplotlib.pyplot as plt

'''
Classification task of toxic not toxic
'''
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=2048, dropout_prob=0.1):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout_prob)
        
        position = torch.arange(max_len).unsqueeze(1)
        exp_term = torch.arange(0, d_model, 2)
        div_term = torch.exp(exp_term * (-math.log(10_000.0) / d_model))
        pe = torch.zeros(1, max_len, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe) #Constant tensor buffer which calls the position encoding for each elemental
        
    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)


class MultiHeadAttention(nn.Module):
    '''
    d_model: word embedding length
    n_head: number of attention heads
    d_k: word embedding is split across multiple heads. This is their new length
    
    '''
    def __init__(self, d_k, d_model, n_heads):
        super().__init__()
        
        self.d_k = d_k
        self.n_heads = n_heads
        
        self.key = nn.Linear(d_model, d_k * n_heads)
        self.query = nn.Linear(d_model, d_k * n_heads)
        self.value = nn.Linear(d_model, d_k * n_heads)
        
        self.fc = nn.Linear(d_k * n_heads, d_model)
        
    def forward(self, q, k, v, mask=None):
        q = self.query(q)
        k = self.key(k)
        v = self.value(v)
        
        N = q.shape[0]
        T = q.shape[1]
        
        q = q.view(N, T, self.n_heads, self.d_k).transpose(1, 2)
        k = k.view(N, T, self.n_heads, self.d_k).transpose(1, 2)
        v = v.view(N, T, self.n_heads, self.d_k).transpose(1, 2)
        
        attn_scores = q @ k.transpose(-2, -1) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask[:, None, None, :] == 0, float('-inf'))
        attn_weights = F.softmax(attn_scores, dim=-1)
        
        A = attn_weights @ v
        
        A = A.transpose(1, 2)
        A = A.contiguous().view(N, T, self.d_k * self.n_heads)
        
        return self.fc(A)

class TransformerBlock(nn.Module):
    def __init__(self, d_k, d_model, n_heads, dropout_prob=0.1):
        super().__init__()
        
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)
        self.mha = MultiHeadAttention(d_k, d_model, n_heads)
        self.ann = nn.Sequential(
            nn.Linear(d_model, d_model * 4),
            nn.GELU(),
            nn.Linear(d_model * 4, d_model),
            nn.Dropout(dropout_prob)
        )
        self.dropout = nn.Dropout(dropout_prob)
    
    def forward(self, x, mask=None):
        x = self.ln1(x + self.mha(x, x, x, mask))
        x = self.ln2(x + self.ann(x))
        x = self.dropout(x)
        return x
    
class Encoder(nn.Module):
    def __init__(self, vocab_size, max_len, d_k, d_model, n_heads, n_layers, n_classes, dropout_prob=0.1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encoding = PositionalEncoding(d_model, max_len, dropout_prob)
        transformer_blocks = [
            TransformerBlock(d_k, d_model, n_heads, dropout_prob) for _ in range(n_layers)
        ]
        self.transformer_blocks = nn.Sequential(*transformer_blocks)
        self.ln = nn.LayerNorm(d_model)
        self.fc = nn.Linear(d_model, n_classes)
    
    def forward(self, x, mask=None):
        x = self.embedding(x)
        x = self.pos_encoding(x)
        for block in self.transformer_blocks:
            x = block(x, mask)
        x = x[:, 0, :]
        x = self.ln(x)
        x = self.fc(x)
        return x

In [2]:
from transformers import AutoTokenizer

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
tokenizer = AutoTokenizer.from_pretrained('distilbert-base-cased')

In [3]:
from transformers import DataCollatorWithPadding
from datasets import load_dataset, load_from_disk
import torch.optim as optim
from torch.utils.data import DataLoader
import os

# Tokenization and DataLoader preparation
def tokenize_function(examples):
    return tokenizer(examples['sentence'], truncation=True)

# Tokenize the dataset
if os.path.isdir("tokenized_dataset"):
    tokenized_ds = load_from_disk("tokenized_dataset")
else:
    ds = load_dataset("glue", "sst2")
    tokenized_ds = ds.map(tokenize_function, batched=True)
    tokenized_ds.save_to_disk("tokenized_dataset")
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

tokenized_ds = tokenized_ds.remove_columns(['sentence', 'idx'])
tokenized_ds = tokenized_ds.rename_column("label", "labels")

# Define PyTorch DataLoader
train_loader = DataLoader(tokenized_ds['train'], batch_size=32, shuffle=True, collate_fn=data_collator)
val_loader = DataLoader(tokenized_ds['validation'], batch_size=32, collate_fn=data_collator)

Map:   0%|          | 0/872 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/67349 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/872 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/1821 [00:00<?, ? examples/s]

In [4]:
tokenized_ds

DatasetDict({
    train: Dataset({
        features: ['labels', 'input_ids', 'attention_mask'],
        num_rows: 67349
    })
    validation: Dataset({
        features: ['labels', 'input_ids', 'attention_mask'],
        num_rows: 872
    })
    test: Dataset({
        features: ['labels', 'input_ids', 'attention_mask'],
        num_rows: 1821
    })
})

In [6]:
for batch in val_loader:
    for k, v in batch.items():
        print("k:", k, "v.shape", v.shape)  
    break

k: labels v.shape torch.Size([32])
k: input_ids v.shape torch.Size([32, 51])
k: attention_mask v.shape torch.Size([32, 51])


In [7]:
from transformers import AutoConfig
config = AutoConfig.from_pretrained('distilbert-base-cased')
config.max_position_embeddings

512

In [12]:
model = Encoder(tokenizer.vocab_size, config.max_position_embeddings, 16, 64, 4, 2, 2, 0.1)
model.to(device)

Encoder(
  (embedding): Embedding(28996, 64)
  (pos_encoding): PositionalEncoding(
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (transformer_blocks): Sequential(
    (0): TransformerBlock(
      (ln1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (ln2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
      (mha): MultiHeadAttention(
        (key): Linear(in_features=64, out_features=64, bias=True)
        (query): Linear(in_features=64, out_features=64, bias=True)
        (value): Linear(in_features=64, out_features=64, bias=True)
        (fc): Linear(in_features=64, out_features=64, bias=True)
      )
      (ann): Sequential(
        (0): Linear(in_features=64, out_features=256, bias=True)
        (1): GELU(approximate='none')
        (2): Linear(in_features=256, out_features=64, bias=True)
        (3): Dropout(p=0.1, inplace=False)
      )
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (1): TransformerBlock(
      (ln1): LayerNorm((64,), eps=1e-05, 

In [13]:
text = "you are a winner"
model.eval()

# Tokenize the input text
encoding = tokenizer(
    text,
    add_special_tokens=True,
    max_length=512,
    return_token_type_ids=False,
    padding='max_length',
    truncation=True,
    return_attention_mask=True,
    return_tensors='pt',
)

input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)

with torch.inference_mode():
    output = model(input_ids, mask=attention_mask)
    _, prediction = torch.max(output, dim=1) # batch, prediction
    print(prediction)

tensor([0], device='cuda:0')


In [14]:
# Configuration
epochs = 4
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
train_losses = np.zeros(epochs)
test_losses = np.zeros(epochs)

# Training loop
for it in range(epochs):
    model.train()

    train_loss = 0
    n_train = 0

    # Training phase
    for batch in train_loader:
        labels = batch['labels'].to(device)
        inputs = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)

        optimizer.zero_grad()
        outputs = model(inputs, mask=attention_mask)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        # Accumulate loss and count samples
        train_loss += loss.item() * inputs.size(0)
        n_train += inputs.size(0)

    # Average training loss for this epoch
    train_loss /= n_train

    # Evaluation phase
    model.eval()
    test_loss = 0
    n_test = 0

    with torch.inference_mode():
        for batch in val_loader:
            labels = batch['labels'].to(device)
            inputs = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)

            outputs = model(inputs, mask=attention_mask)
            loss = criterion(outputs, labels)

            # Accumulate loss and count samples
            test_loss += loss.item() * inputs.size(0)
            n_test +=inputs.size(0)

    # Average test loss for this epoch
    test_loss /= n_test

    # Record losses
    train_losses[it] = train_loss
    test_losses[it] = test_loss

    # Print epoch summary
    print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')

Epoch 1/4, Train Loss: 0.6549, Test Loss: 0.5915
Epoch 2/4, Train Loss: 0.5750, Test Loss: 0.5352
Epoch 3/4, Train Loss: 0.5235, Test Loss: 0.5166
Epoch 4/4, Train Loss: 0.4861, Test Loss: 0.5165


In [15]:
model.eval()
n_correct = 0.
n_total = 0.
for batch in train_loader:
    labels = batch['labels'].to(device)
    inputs = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    outputs = model(inputs, mask=attention_mask)
    _, predictions = torch.max(outputs, 1)
    
    n_correct += (predictions == labels).sum().item()
    n_total += labels.shape[0]
train_acc = n_correct / n_total
    
    
n_correct = 0.
n_total = 0.
for batch in val_loader:
    labels = batch['labels'].to(device)
    inputs = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    outputs = model(inputs, mask=attention_mask)
    _, predictions = torch.max(outputs, 1)
    n_correct += (predictions == labels).sum().item()
    n_total += labels.shape[0]
    
test_acc = n_correct / n_total
print(f"Train acc: {train_acc:.4f}, Test acc: {test_acc:.4f}")

Train acc: 0.8068, Test acc: 0.7557
