In [1]:
import torch
import torch.nn as nn

In [3]:
# trying RoBERTa implementation
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        if d_model % 2 == 1:
            pe[:, 1::2] = torch.cos(position * div_term[:-1])
        else:
            pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len, :].to(x.device)

class myRoBERTa(nn.Module):
    def __init__(self, vocab_size, H = 768, A = 12, L = 12, feed_forward_dim = 3072 , dropout = 0.1): # H, H/64, H/64, 4*H
        super().__init__()
        self.embeddings = nn.Embedding(vocab_size, H, padding_idx=0)
        self.position_embeddings = PositionalEncoding(H)
        encoder = nn.TransformerEncoderLayer(d_model= H, nhead=A, dim_feedforward=feed_forward_dim, dropout=dropout,
                                            activation="gelu", batch_first=True, layer_norm_eps=1e-5, 
                                            norm_first=True, bias = True)
        self.encoder = nn.TransformerEncoder(encoder_layer=encoder, 
                                            num_layers=L, norm=nn.LayerNorm(H), enable_nested_tensor=True, 
                                            mask_check=True)
        self.fc = nn.Linear(H, vocab_size)  
    def forward(self, x):
        x = self.embeddings(x)
        x = self.position_embeddings(x)
        x = self.encoder(x)
        x = self.fc(x)
        return x
        


In [7]:
import torch
import torch.nn as nn
import math

class MyRoBERTa(nn.Module):
    def __init__(self, vocab_size, H=768, A=12, L=12, feed_forward_dim=3072, dropout=0.1, max_len=512, padding_idx=0):
        super().__init__()
        self.padding_idx = padding_idx
        
        # 1. Token Embeddings
        self.embeddings = nn.Embedding(vocab_size, H, padding_idx=padding_idx)
        
        # 2. Learned Positional Embeddings (Standard for RoBERTa/BERT)
        # RoBERTa usually reserves indices for padding, so max_len + padding_idx + 1 is often used safe-guard
        self.position_embeddings = nn.Embedding(max_len + padding_idx + 1, H, padding_idx=padding_idx)
        
        # 3. Layer Norm and Dropout for embeddings
        self.LayerNorm = nn.LayerNorm(H, eps=1e-5)
        self.dropout = nn.Dropout(dropout)

        # 4. Encoder
        # Note: RoBERTa uses GELU. norm_first=True is 'Pre-LN' (Better stability), 
        # norm_first=False is 'Post-LN' (Original BERT/RoBERTa). 
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=H, 
            nhead=A, 
            dim_feedforward=feed_forward_dim, 
            dropout=dropout,
            activation="gelu", 
            batch_first=True, 
            layer_norm_eps=1e-5, 
            norm_first=True 
        )
        
        # FIX: Passed num_layers=L as a keyword argument
        self.encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer, 
            num_layers=L, 
            norm=nn.LayerNorm(H)
        )
        
        # 5. Output Head
        self.fc = nn.Linear(H, vocab_size)
        
        # 6. Weight Tying (Critical for RoBERTa/BERT performance)
        # The embedding weights and output layer weights are shared.
        self.fc.weight = self.embeddings.weight

    def forward(self, input_ids):
        # input_ids shape: (Batch, Seq_Len)
        seq_len = input_ids.size(1)
        
        # Create position IDs (0, 1, 2, ... seq_len-1)
        # In actual RoBERTa, usually padding is ignored in pos_ids, but this is a simplified version
        position_ids = torch.arange(seq_len, dtype=torch.long, device=input_ids.device)
        position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
        
        # Get Embeddings
        token_emb = self.embeddings(input_ids)
        pos_emb = self.position_embeddings(position_ids)
        
        x = token_emb + pos_emb
        x = self.LayerNorm(x)
        x = self.dropout(x)
        
        # Create Padding Mask
        # PyTorch Transformer expects: (Batch, Seq_Len) where True = Ignored/Padding
        src_key_padding_mask = (input_ids == self.padding_idx)
        
        # Pass through Encoder
        x = self.encoder(x, src_key_padding_mask=src_key_padding_mask)
        
        # Output Head
        x = self.fc(x)
        return x

# Test the implementation
if __name__ == "__main__":
    vocab_size = 1000
    model = MyRoBERTa(vocab_size=vocab_size)
    
    # Create dummy input with padding (0)
    x = torch.randint(1, vocab_size, (2, 10)) # Batch 2, Seq 10
    x[0, 8:] = 0 # Add padding to first sentence
    
    output = model(x)
    print(f"Output shape: {output.shape}") # Should be [2, 10, 1000]



Output shape: torch.Size([2, 10, 1000])


In [4]:
%pip install tqdm boto3 requests regex sentencepiece sacremoses

Note: you may need to restart the kernel to use updated packages.


In [15]:
%pip install transformers tokenizers

Note: you may need to restart the kernel to use updated packages.


In [17]:
from transformers import pipeline

transcriber = pipeline(
    task="automatic-speech-recognition", model="openai/whisper-large-v3"
)
result = transcriber(
    "https://huggingface.co/datasets/Narsil/asr_dummy/resolve/main/mlk.flac"
)
print(result)

ImportError: cannot import name 'pipeline' from 'transformers' (/opt/homebrew/anaconda3/lib/python3.11/site-packages/transformers/__init__.py)

In [None]:
import 

In [11]:
help(nn.Embedding)

Help on class Embedding in module torch.nn.modules.sparse:

class Embedding(torch.nn.modules.module.Module)
 |  Embedding(num_embeddings: int, embedding_dim: int, padding_idx: Optional[int] = None, max_norm: Optional[float] = None, norm_type: float = 2.0, scale_grad_by_freq: bool = False, sparse: bool = False, _weight: Optional[torch.Tensor] = None, _freeze: bool = False, device=None, dtype=None) -> None
 |  
 |  A simple lookup table that stores embeddings of a fixed dictionary and size.
 |  
 |  This module is often used to store word embeddings and retrieve them using indices.
 |  The input to the module is a list of indices, and the output is the corresponding
 |  word embeddings.
 |  
 |  Args:
 |      num_embeddings (int): size of the dictionary of embeddings
 |      embedding_dim (int): the size of each embedding vector
 |      padding_idx (int, optional): If specified, the entries at :attr:`padding_idx` do not contribute to the gradient;
 |                                   the

In [19]:
A = torch.randn(2048, 2048, device='mps', dtype=torch.bfloat16)
B = torch.randn(2048, 2048, device='mps', dtype=torch.bfloat16)
ref = torch.mm(A, B)
for _ in range(1000):
    assert (torch.mm(A, B) - ref).abs().max().item() == 0


In [64]:
help(torch.kl_div(torch.tensor([1.0]), torch.tensor([1.0])))

Help on Tensor in module torch object:

class Tensor(torch._C.TensorBase)
 |  Method resolution order:
 |      Tensor
 |      torch._C.TensorBase
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __abs__ = abs(...)
 |  
 |  __array__(self, dtype=None)
 |  
 |  __array_wrap__(self, array)
 |      # Wrap Numpy array again in a suitable tensor when done, to support e.g.
 |      # `numpy.sin(tensor) -> tensor` or `numpy.greater(tensor, 0) -> ByteTensor`
 |  
 |  __contains__(self, element: Any, /) -> bool
 |      Check if `element` is present in tensor
 |      
 |      Args:
 |          element (Tensor or scalar): element to be checked
 |              for presence in current tensor"
 |  
 |  __deepcopy__(self, memo)
 |  
 |  __dir__(self)
 |      Default dir() implementation.
 |  
 |  __dlpack__(self, stream=None)
 |      Creates a DLpack `capsule https://data-apis.org/array-api/latest/design_topics/data_interchange.html#data-interchange`_
 |      of the current tensor to be 

In [70]:
torch.kl_div(torch.tensor([10.0]), torch.tensor([1.0]))

tensor([-10.])

In [71]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from tqdm import tqdm
# ['<ADJLIST_START>', '(3,5)', '<-->', '(2,5)', ';', '(3,2)', '<-->', '(3,3)', ';', '(3,4)', '<-->', '(3,3)', ';', '(0,4)', '<-->', '(1,4)', ';', '(2,3)', '<-->', '(2,4)', ';', '(1,5)', '<-->', '(2,5)', ';', '(3,4)', '<-->', '(3,5)', ';', '(0,1)', '<-->', '(0,0)', ';', '(0,2)', '<-->', '(0,1)', ';', '(3,2)', '<-->', '(2,2)', ';', '(2,3)', '<-->', '(1,3)', ';', '(1,4)', '<-->', '(1,5)', ';', '(1,0)', '<-->', '(0,0)', ';', '(1,3)', '<-->', '(1,2)', ';', '(1,2)', '<-->', '(2,2)', ';', '(0,4)', '<-->', '(0,3)', ';', '(0,2)', '<-->', '(0,3)', ';', '<ADJLIST_END>', '<ORIGIN_START>', '(0,3)', '<ORIGIN_END>', '<TARGET_START>', '(1,3)', '<TARGET_END>', '<PATH_START>']
class MazeDataset(torch.utils.data.Dataset):
    def __init__(self, df):
        super().__init__()
        d = {'<ADJLIST_START>':1,'<ADJLIST_END>':2, '<ORIGIN_START>':3,'<ORIGIN_END>':4,'<TARGET_START>':5,'<TARGET_END>':6, '<PATH_START>':7,';':8,'<-->':9}
        mx = max(d.values())+1
        f = lambda x: x[0]*6+x[1]+mx
        V = f((5,5))+1
        print(V)
        def encode(df):
            y = []
            for i in tqdm(range(df.shape[0])):
                out_tokens = eval(df.iloc[i]['output_path'])
                in_tokens = eval(df.iloc[i]['input_sequence'])
                y_out = [1]
                y_in = []
                for token in in_tokens:
                    # if token in (,): continue
                    y_in.append(f(eval(token)) if len(token) == 5 else d[token])
                for token in out_tokens:
                    if len(token) == 5:
                        p = eval(token)
                        y_out.append(p[0]*6+p[1]+3)
                    else: y_out.append(2)
                if len(y_in) < 249: y_in.extend([0]*(249-len(y_in))) #pad
                if len(y_out) < 38: y_out.extend([0]*(38-len(y_out))) 
                y.append((torch.LongTensor(y_in), torch.LongTensor(y_out))) 
            return y
        self.data = encode(df)
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        return self.data[index]

df_train, df_val = train_test_split(df, train_size=0.9, random_state=42, stratify=df['maze_type'])
train_dataset = MazeDataset(df_train)
val_dataset = MazeDataset(df_val)
test_dataset = MazeDataset(df_test)

In [72]:

class BahdanauAttention(nn.Module):
    def __init__(self, n_enc, n_dec, n_attn):
        super().__init__()
        self.W_a = nn.Linear(n_dec, n_attn, bias=False)
        self.U_a = nn.Linear(n_enc, n_attn, bias=False)
        self.v_a = nn.Linear(n_attn, 1, bias=False)

    def forward(self, h, s_i_1,mask):
        """
        h:(batch, seq_len, n)
        s_{i-1}: (batch, n)  -> prev hidden state of decoder
        """
        energy = torch.tanh(self.U_a(h) + self.W_a(s_i_1).unsqueeze(1))

        alpha = self.v_a(energy).squeeze(-1)
        if mask is not None:
            alpha = alpha.masked_fill(~mask, -1e4)
        attn_weights = F.softmax(alpha, dim=-1)

        # context: (batch, hidden_size)
        context = torch.bmm(attn_weights.unsqueeze(1), h).squeeze(1)

        return context, attn_weights

In [73]:
class RNNenc(nn.Module):
    def __init__(self, K_x, m_x, n_enc, padding_idx=0, dropout=0):
        super().__init__()
        self.embeder = nn.Embedding(K_x, m_x, padding_idx=padding_idx)
        self.rnn = nn.RNN(
            input_size=m_x,
            hidden_size=n_enc,
            num_layers=2,
            nonlinearity='tanh',
            batch_first=True,
            dropout = dropout,
            bidirectional=True
        )
        self.dropout = nn.Dropout(dropout)
    def forward(self, x):
        emb = self.dropout(self.embeder(x))
        h, h_last = self.rnn(emb)
        return h, h_last[[0,2]]


In [None]:
class RNNdec(nn.Module):
    def __init__(self, K_y, m_y, n_dec, n_enc, n_attn,out_dim, paddin_idx,dropout=0):
        super().__init__()
        self.embeder = nn.Embedding(K_y, m_y,padding_idx=paddin_idx)
        self.attention = BahdanauAttention(n_enc,n_dec=n_dec, n_attn=n_attn)
        self.rnn = nn.RNN(m_y+n_enc, 
                          n_dec,
                          num_layers=2,
                          batch_first=True,
                          nonlinearity='tanh',dropout=dropout)
        self.out = nn.Linear(n_dec+m_y+n_attn, out_dim)
        self.dropout=nn.Dropout(dropout)
    def forward(self, h, h_last, y=None, T_y=None, 
                teacher_forcing = 0.5, mask=None,starter=1):
        batch_size = h.size(0)
        teacher_size = int(batch_size*teacher_forcing)
        device = h.device
        if y is not None: T_y = y.size(1)-1
        out_len = T_y
        s_t = h_last
        y_in = torch.full((batch_size,),starter, device=device, dtype=torch.long)
        outputs = []
        all_attn_weights = []
        for t in range(out_len):
            emb = self.dropout(self.embeder(y_in)).unsqueeze(1)
            context, attn_weights = self.attention.forward(h, s_t[-1], mask=mask)
            context_unsq = context.unsqueeze(1) # so that rnn runs for 1 unit
            rnn_in = torch.cat([emb, context_unsq], dim=2)
            all_attn_weights.append(attn_weights.unsqueeze(1)) # unsqueeze so we can cat all later on
            _, s_t = self.rnn(rnn_in, s_t)
            out_in = torch.cat([s_t[-1],context,emb.squeeze(1)], dim=1)
            logits = self.out(out_in) # (batch,out_dim)
            outputs.append(logits.unsqueeze(1))
            if t < out_len-1:
                y_in = logits.argmax(dim=1)
                if y is not None:
                    # teacher forcing
                    idxs = np.arange(batch_size)
                    np.random.shuffle(idxs)
                    y_in[idxs[:teacher_size]] = y[idxs[:teacher_size],t+1]
        outputs = torch.cat(outputs, dim=1)
        all_attn_weights = torch.cat(all_attn_weights,dim=1)
        return outputs, all_attn_weights
    def generate(self, h, h_last, adj, end, T_y, mask=None,starter=1):
        batch_size = h.size(0)
        device = h.device
        out_len = T_y
        s_t = h_last
        y_in = torch.full((batch_size,),starter, device=device, dtype=torch.long)
        outputs = []
        for t in range(out_len):
            emb = self.dropout(self.embeder(y_in)).unsqueeze(1)
            context, _ = self.attention.forward(h, s_t[-1], mask=mask)
            context_unsq = context.unsqueeze(1) # so that rnn runs for 1 unit
            rnn_in = torch.cat([emb, context_unsq], dim=2)
            _, s_t = self.rnn(rnn_in, s_t)
            out_in = torch.cat([s_t[-1],context,emb.squeeze(1)], dim=1)
            logits = self.out(out_in) # (batch,out_dim)
            x = y_in-3
            y_in = logits.argmax(dim = 1)
            for i in range(batch_size):
                if x[i] < 0:
                    continue
                if x[i] == end[i]:
                    y_in[i] = 2
                    continue
                mx = -1e4
                y_j = 0
                for y in adj[i][x[i]]:
                    if mx < logits[i,y+3].item():
                        mx = logits[i,y+3].item()
                        y_j = y
                y_in[i] = y_j+3
            outputs.append(y_in.unsqueeze(0))
        outputs = torch.cat(outputs, dim=0)
        return outputs

In [None]:
class seq2seqBahdanau(nn.Module):
    def __init__(self, K_x,m_x, K_y,m_y, n_enc, n_attn, n_dec, out_dim,dropout=0):
        super().__init__()
        self.encoder = RNNenc(K_x, m_x, n_enc, padding_idx=0, dropout=dropout)
        self.decoder = RNNdec(K_y, m_y, n_dec, n_enc, n_attn, out_dim, paddin_idx=0,dropout=dropout)
    def forward(self, X, y=None, T_y=None, teacher_forcing=0.5, mask=None, starter = 1):
        h, h_last = self.encoder(X)
        logits, attn_weights = self.decoder(h, h_last, y, T_y, teacher_forcing, mask, starter)
        return logits, attn_weights
    def generate(self, X, adj, end, T_y=None, mask=None, starter = 1):
        h, h_last = self.encoder(X)
        self.decoder.generate(h, h_last, adj, end, T_y, mask, starter)

In [83]:
def collate_fn(batch, pad_idx=0, adj_end=2):
    inp = [item[0].unsqueeze(0) for item in batch]
    out = [item[1].unsqueeze(0) for item in batch]
    adj = [[[] for _ in range(36)] for _ in range(len(inp))]
    end = [0]*len(inp)
    for i in range(len(inp)):
        for j in range(1, len(inp[i]), 4):
            if inp[i][j] == adj_end:
                end[i] = inp[i][j+5]-10
                break
            x = torch.argmax(inp[i][j])-10
            y = torch.argmax(inp[i][j+2])-10
            adj[i][x].append(y)
            adj[i][y].append(x)
    inp_seqs = torch.cat(inp, dim = 0)  # (batch, T_x)
    out_paths = torch.cat(out, dim = 0) # (batch, T_y)
    inp_mask = (inp_seqs != pad_idx)
    return {'inp': inp_seqs, 'out': out_paths, 'inp_mask': inp_mask, 'adj':adj, 'end':end}



In [None]:
def evaluate(model, dataloader, device=torch.device(0), pad_idx=0, eos = 2):
    model.eval()
    # total_loss = 0.0
    total_tokens = 0
    total_seq_wrongs = 0
    total_tp = total_fp = total_fn = 0

    with torch.no_grad():
        for batch in tqdm(dataloader):
            inp = batch['inp'].to(device)
            out = batch['out'].to(device)
            inp_mask = batch['inp_mask'].to(device)
            adj = batch['adj']
            end = batch['end']
            preds = model.generate(inp, adj, end, out.size(1)-1, mask=inp_mask) # logits: (B, T_y-1, V)
            preds, _ = decode(preds)
            targets = out[:, 1:].contiguous() # (B, T_y-1)
            total_tokens += num_tokens
            total_seq_wrongs += int(((preds != targets) & tgt_mask).any(dim=1).sum().item())

            # micro F1 counts (exclude pads)
            for p_row, t_row, m_row in zip(preds, targets, tgt_mask):
                # filter out pad positions
                if not m_row.any():
                    continue
                p_list = p_row[m_row].tolist()
                t_list = t_row[m_row].tolist()
                pred_ctr = Counter(p_list)
                act_ctr = Counter(t_list)
                tp = sum((pred_ctr & act_ctr).values())
                fp = sum((pred_ctr - act_ctr).values())
                fn = sum((act_ctr - pred_ctr).values())
                total_tp += tp
                total_fp += fp
                total_fn += fn

    # Final metrics
    seq_accuracy = 1.0 - (total_seq_wrongs / len(dataloader.dataset))

    if (total_tp + total_fp) > 0:
        p = total_tp / (total_tp + total_fp)
    else:
        p = 0.0
    if (total_tp + total_fn) > 0:
        r = total_tp / (total_tp + total_fn)
    else:
        r = 0.0
    f1 = (2 * p * r / (p + r)) if (p + r) > 0 else 0.0

    return seq_accuracy, f1

In [92]:
a = torch.tensor([1,2,3]).unsqueeze(1)
b = torch.tensor([1,2,3]).unsqueeze(1)

In [99]:
(torch.cat([a,b], dim = -1))

tensor([[1, 1],
        [2, 2],
        [3, 3]])

In [114]:
torch.randint(10, (1,)).item()

4