In [1]:
import json
import math
from collections import OrderedDict
import torch
from torch import nn, Tensor
from typing import Union, Tuple, List, Iterable, Dict
import torch.nn.functional as F
from torch.nn.parameter import Parameter
from torch.optim import AdamW
from torch.utils.data import DataLoader
#from scipy.stats import pearsonr, spearmanr
import numpy as np
import gzip, csv
import pandas as pd
from tqdm.auto import tqdm
import torch.nn.init as init
torch.manual_seed(0)
np.random.seed(0)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cpu


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def gelu(x):
    """Implementation of the gelu activation function."""
    return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))

In [3]:
class PositionalEncoding(nn.Module):

    def __init__(self, embed_dim: int, drop_rate=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=drop_rate)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, embed_dim, 2) * (-math.log(10000.0) / embed_dim))
        pe = torch.zeros(1, max_len, embed_dim)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        """
        Args:
            x: Tensor, shape [batch_size, seq_len, embedding_dim]
        """
        x = x + self.pe[:, :x.size(1)]
        return self.dropout(x)

In [4]:
#def scaled_dot_product(q, k, v, attn_drop_rate=0.1, mask=None):
def scaled_dot_product(q, k, v, attn_drop_rate=0.1):
    """
    Parameters:
      q: query, shape: (batch, # heads, seq len, head dimension)
      k: keys, shape: (batch, # heads, seq len, head dimension)
      v: value, shape: (batch, # heads, seq len, head dimension)
      attn_drop_rate: probability of an element to be zeroed,
      mask: the optional masking of specific entries in the attention matrix.
              shape: (batch, seq len)
    """
    
    d_k = q.shape[-1]
    attn_logits = torch.matmul(q, k.transpose(-1, -2))
    attn_logits = attn_logits/math.sqrt(d_k)
    # if mask is not None:
    #   dummy_mask = torch.where(mask == 1.0, torch.tensor(True), torch.tensor(False))
    #   attn_logits = attn_logits.masked_fill(dummy_mask.unsqueeze(1).unsqueeze(1), float('-inf'))
    attention = F.softmax(attn_logits, dim=-1)
    attention = F.dropout(attention)
    values = torch.matmul(attention,v)
    return values

In [5]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_dim, n_heads, attn_drop_rate):
        super().__init__()
        self.embed_dim = embed_dim
        self.n_heads = n_heads
        self.head_dim = embed_dim // n_heads
        self.attn_drop_rate = attn_drop_rate
        self.query = nn.Linear(self.embed_dim, self.n_heads*self.head_dim)
        self.key = nn.Linear(self.embed_dim, self.n_heads*self.head_dim)
        self.value = nn.Linear(self.embed_dim, self.n_heads*self.head_dim)
        self.o_proj = nn.Linear(self.embed_dim, self.n_heads*self.head_dim)
        self._reset_parameters()

    def _reset_parameters(self):
      nn.init.xavier_uniform_(self.query.weight)
      self.query.bias.data.fill_(0)
      nn.init.xavier_uniform_(self.key.weight)
      self.key.bias.data.fill_(0)
      nn.init.xavier_uniform_(self.value.weight)
      self.value.bias.data.fill_(0)
      nn.init.xavier_uniform_(self.o_proj.weight)
      self.o_proj.bias.data.fill_(0)

    def split_heads(self, tensor):
       new_shape = tensor.size()[:-1] + (self.n_heads, self.head_dim)
       tensor = tensor.view(*new_shape)
       tensor = tensor.permute(0, 2, 1, 3).contiguous()
       return tensor
    
    def merge_heads(self, tensor, batch_size, seq_length):
       tensor = tensor.transpose(1, 2).contiguous().view(batch_size, seq_length, self.embed_dim)
       return tensor
    
    def forward(self, embedding):
       batch_size, seq_length, embed_dim = embedding.size()
       q, k, v = self.query(embedding), self.key(embedding), self.value(embedding)
       q = self.split_heads(q)
       k = self.split_heads(k)
       v = self.split_heads(v)
       values = scaled_dot_product(q, k, v, self.attn_drop_rate)
       values = self.merge_heads(values, batch_size, seq_length)
       attended_embeds = self.o_proj(values)

       return attended_embeds

    


In [6]:
class LayerNormalization(nn.Module):
    def __init__(self, parameters_shape, eps=1e-5):
        super().__init__()
        self.parameters_shape=parameters_shape
        self.eps=eps
        self.gamma = nn.Parameter(torch.ones(parameters_shape))
        self.beta =  nn.Parameter(torch.zeros(parameters_shape))

    def forward(self, inputs):
        dims = [-(i + 1) for i in range(len(self.parameters_shape))]
        mean = inputs.mean(dim=dims, keepdim=True)
        print(f"Mean ({mean.size()})")
        var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
        std = (var + self.eps).sqrt()
        print(f"Standard Deviation  ({std.size()})")
        y = (inputs - mean) / std
        print(f"y: {y.size()}")
        out = self.gamma * y  + self.beta
        print(f"self.gamma: {self.gamma.size()}, self.beta: {self.beta.size()}")
        print(f"out: {out.size()}")
        return out

In [7]:
class PositionwiseFeedForward(nn.Module):

    def __init__(self, embed_dim, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(embed_dim, 4*embed_dim)
        self.linear2 = nn.Linear(4*embed_dim, embed_dim)
        self.relu = nn.GELU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        print(f"x after dropout: {x.size()}")
        x = self.linear2(x)
        print(f"x after 2nd linear layer: {x.size()}")
        return x

In [8]:
# class Classifier: #mewmew
class Classifier(nn.Module):
    def __init__(self, input_dim, numclasses, dropout_rate=0.1):
        super(Classifier, self).__init__()

        # Define layers
        self.fc1 = nn.Linear(input_dim, input_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.fc2 = nn.Linear(input_dim, numclasses)

    def forward(self, x):
        # Forward pass
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [9]:
class EncoderLayer(nn.Module):

    def __init__(self, embed_dim, n_heads, attn_drop_rate, layer_drop_rate):
        super(EncoderLayer, self).__init__()
        self.embed_dim = embed_dim
        self.n_heads = n_heads
        self.attention = MultiHeadAttention(self.embed_dim, self.n_heads, attn_drop_rate)
        self.norm1 = LayerNormalization(parameters_shape=[self.embed_dim])
        self.dropout1 = nn.Dropout(p=layer_drop_rate)
        self.ffn = PositionwiseFeedForward(self.embed_dim,layer_drop_rate)
        self.norm2 = LayerNormalization(parameters_shape=[self.embed_dim])
        self.dropout2 = nn.Dropout(p=layer_drop_rate)

    def forward(self, x):
        residual_x = x
        print("------- ATTENTION 1 ------")
        x = self.attention(x)
        print("------- DROPOUT 1 ------")
        x = self.dropout1(x)
        print("------- ADD AND LAYER NORMALIZATION 1 ------")
        x = x + residual_x
        x = self.norm1(x)
        residual_x = x
        print("------- ATTENTION 2 ------")
        x = self.ffn(x)
        print("------- DROPOUT 2 ------")
        x = self.dropout2(x)
        print("------- ADD AND LAYER NORMALIZATION 2 ------")
        x = x + residual_x
        x = self.norm2(x)
        #add and norm switch refer to assignment
        return x

In [10]:
embed_dim = 16
n_heads = 4
attn_drop_rate = 0.1
layer_drop_rate = 0.1
block = EncoderLayer(embed_dim, n_heads, attn_drop_rate, layer_drop_rate)

bs = 3
seq_len = 2
embeds = torch.randn(bs, seq_len, embed_dim)
outputs = block(embeds)
out_bs, out_seq_len, out_hidden = outputs.shape
print("Output shape: ", (out_bs, out_seq_len, out_hidden))
assert out_bs == bs and out_seq_len == seq_len and out_hidden == embed_dim, "Unexpected output shape"

------- ATTENTION 1 ------
------- DROPOUT 1 ------
------- ADD AND LAYER NORMALIZATION 1 ------
Mean (torch.Size([3, 2, 1]))
Standard Deviation  (torch.Size([3, 2, 1]))
y: torch.Size([3, 2, 16])
self.gamma: torch.Size([16]), self.beta: torch.Size([16])
out: torch.Size([3, 2, 16])
------- ATTENTION 2 ------
x after dropout: torch.Size([3, 2, 64])
x after 2nd linear layer: torch.Size([3, 2, 16])
------- DROPOUT 2 ------
------- ADD AND LAYER NORMALIZATION 2 ------
Mean (torch.Size([3, 2, 1]))
Standard Deviation  (torch.Size([3, 2, 1]))
y: torch.Size([3, 2, 16])
self.gamma: torch.Size([16]), self.beta: torch.Size([16])
out: torch.Size([3, 2, 16])
Output shape:  (3, 2, 16)


In [11]:
class BERT(nn.Module): #Transformer
    def __init__(self, n_layers, vocab_size, embed_dim, n_heads, num_classes, attn_drop_rate, layer_drop_rate):
        super().__init__()
        self.embed = nn.Embedding(vocab_size+1, embed_dim)
        self.position = PositionalEncoding(embed_dim, layer_drop_rate)
        self.net = nn.Sequential(*[
        EncoderLayer(embed_dim, n_heads, attn_drop_rate, layer_drop_rate) for _ in range(n_layers)
        ])
        self.mask_pred = nn.Linear(embed_dim, vocab_size) ## Classifier
        self.classifier = Classifier(embed_dim, num_classes)
    def forward(self, batch_text):
        embedding = self.position(self.embed(batch_text))
        new_embedding = self.net((embedding))
        mask_preds = self.mask_pred(new_embedding)
        return mask_preds
        


In [12]:
embed_dim = 16
n_heads = 4
n_layers = 2
vocab_size = 10
attn_drop_rate = 0.1
layer_drop_rate = 0.1
num_classes=20
model = BERT(n_layers, vocab_size, embed_dim, n_heads, num_classes, attn_drop_rate, layer_drop_rate)

# bs = 3
# seq_len = 2
# inputs = torch.randint(0, vocab_size, (bs, seq_len))
# mask_preds = model(inputs)
# out_bs, out_seq_len, out_vocab = mask_preds.shape
# print("Mask predictions shape: ", (out_bs, out_seq_len, out_vocab))
# assert out_bs == bs and out_seq_len == seq_len and out_vocab == vocab_size, "Unexpected mask prediction output shape"

------- ATTENTION 1 ------
------- DROPOUT 1 ------
------- ADD AND LAYER NORMALIZATION 1 ------
Mean (torch.Size([3, 2, 1]))
Standard Deviation  (torch.Size([3, 2, 1]))
y: torch.Size([3, 2, 16])
self.gamma: torch.Size([16]), self.beta: torch.Size([16])
out: torch.Size([3, 2, 16])
------- ATTENTION 2 ------
x after dropout: torch.Size([3, 2, 64])
x after 2nd linear layer: torch.Size([3, 2, 16])
------- DROPOUT 2 ------
------- ADD AND LAYER NORMALIZATION 2 ------
Mean (torch.Size([3, 2, 1]))
Standard Deviation  (torch.Size([3, 2, 1]))
y: torch.Size([3, 2, 16])
self.gamma: torch.Size([16]), self.beta: torch.Size([16])
out: torch.Size([3, 2, 16])
------- ATTENTION 1 ------
------- DROPOUT 1 ------
------- ADD AND LAYER NORMALIZATION 1 ------
Mean (torch.Size([3, 2, 1]))
Standard Deviation  (torch.Size([3, 2, 1]))
y: torch.Size([3, 2, 16])
self.gamma: torch.Size([16]), self.beta: torch.Size([16])
out: torch.Size([3, 2, 16])
------- ATTENTION 2 ------
x after dropout: torch.Size([3, 2, 64]

In [13]:
from transformers import BertTokenizer
from datasets import load_dataset

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')  # Replace with your desired tokenizer

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, dataset, tokenizer):
        self.dataset = dataset
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        encoding = self.tokenizer(item['text'], truncation=True, padding='max_length', return_tensors='pt', max_length=512)
        encoding['label'] = torch.tensor(item['label'])
        return encoding


dataset = load_dataset("setfit/20_newsgroups")


Repo card metadata block was not found. Setting CardData to empty.


In [19]:
dataset = load_dataset("setfit/20_newsgroups")


Repo card metadata block was not found. Setting CardData to empty.


DatasetDict({
    train: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 11314
    })
    test: Dataset({
        features: ['text', 'label', 'label_text'],
        num_rows: 7532
    })
})

In [14]:
test = train_dataset.__getitem__(0)

In [15]:
test['input_ids'].shape

torch.Size([1, 512])

In [16]:
# seq_len = 512

In [17]:
test['input_ids']

tensor([[  101,  1045,  2001,  6603,  2065,  3087,  2041,  2045,  2071,  4372,
          7138,  2368,  2033,  2006,  2023,  2482,  1045,  2387,  1996,  2060,
          2154,  1012,  2009,  2001,  1037,  1016,  1011,  2341,  2998,  2482,
          1010,  2246,  2000,  2022,  2013,  1996,  2397, 20341,  1013,  2220,
         17549,  1012,  2009,  2001,  2170,  1037,  5318,  4115,  1012,  1996,
          4303,  2020,  2428,  2235,  1012,  1999,  2804,  1010,  1996,  2392,
         21519,  2001,  3584,  2013,  1996,  2717,  1997,  1996,  2303,  1012,
          2023,  2003,  2035,  1045,  2113,  1012,  2065,  3087,  2064,  2425,
          4168,  1037,  2944,  2171,  1010,  3194, 28699,  2015,  1010,  2086,
          1997,  2537,  1010,  2073,  2023,  2482,  2003,  2081,  1010,  2381,
          1010,  2030,  3649, 18558,  2017,  2031,  2006,  2023, 24151,  2559,
          2482,  1010,  3531,  1041,  1011,  5653,  1012,   102,     0,     0,
             0,     0,     0,     0,     0,     0,  