In [1]:
import os
import sys
from datasets import load_dataset 
from transformers import AutoModelForSeq2SeqLM
from transformers import AutoTokenizer
from transformers import GenerationConfig 
from transformers import pipeline
print('ready loading data')

# Check Python version
print("Python version:", sys.version)

# Check Conda environment
conda_env = os.environ.get('CONDA_DEFAULT_ENV')
print("Conda environment:", conda_env)

ready loading data
Python version: 3.9.19 (main, May  6 2024, 20:12:36) [MSC v.1916 64 bit (AMD64)]
Conda environment: base


In [2]:
import torch
import torch.nn as nn
print('ready')

ready


In [3]:
d_model = 512
n_heads = 8
num_encoder_layers = 6
num_decoder_layers = 6


model = nn.Transformer(
    d_model= d_model,
    nhead = n_heads,
    num_encoder_layers=num_encoder_layers,
    num_decoder_layers=num_decoder_layers
)

In [4]:
model

Transformer(
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
        )
        (linear1): Linear(in_features=512, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=2048, out_features=512, bias=True)
        (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
      (1): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
        )
        (linear1): Linear(in_features=512, out_features=2048, bias=True)
        (dropout): Dropout(p=0.1, in

# Positional Encoding Class

In [5]:
import math

In [7]:
class PositionalEncoder(nn.Module):
    def __init__(self,d_model,max_seq_length=512):
        super(PositionalEncoder,self).__init__()
        self.d_model = d_model
        self.max_seq_length = max_seq_length
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0,max_seq_length,
                                dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0,d_model,2,dtype=torch.float)*-(math.log(10000.0)/d_model))

        pe[:,0::2] = torch.sin(position * div_term)
        pe[:,1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe',pe)

    def forward(self,x):
        x = x + self.pe[:,:x.size(1)]
        return x



# Multi-Head Self Attention

In [8]:
import torch.nn as nn
import torch.nn.functional as F

In [10]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.head_dim = d_model//num_heads

        self.query_linear = nn.Linear(d_model,d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)
        self.output_linear = nn.Linear(d_model, d_model)
    def split_heads(self, x,batch_size):
        x = x.view(batch_size, -1,self.num_heads, self.head_dim)
        return x.permute(0,2,1,3).contiguous().view(batch_size*self.num_heads,-1, self.head_dim)


    def compute_attention(self, query, key, value, mask=None):
        scores = torch.matmul(query,key.permute(1,2,0))
        if mask is not None:
            scores =  scores.masked_fill(mask ==0, float("-1e9"))
        attention_weights = F.softmax(scores, dim=-1)
        return attention_weights
    
    def forward(self, query, key, value, mask=None):
        batch_size = query.size(0)

        query = self.split_heads(self.query_linear(query),batch_size)
        key = self.split_heads(self.query_linear(key),batch_size)
        value = self.split_heads(self.value_linear(value), batch_size)

        attention_weights =  self.compute_attention(query, key, mask )

        output = torch.matmul(attention_weights, value)
        output = output.view(batch_size,self.num_heads,-1, self.head_dim).permute(0,2,1,3).contiguous().view(batch_size,-1,self.d_model)
        
        return self.output_linear(output)



In [12]:
class FeedForwardSubLayer(nn.Module):
    def __init__(self,d_model,d_ff):
        super(FeedForwardSubLayer, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff,d_model)
        self.relu = nn.ReLU()

    def forward(self,x):
        return self.fc2(self.relu(self.fc1(x)))

In [13]:
class EncoderLayer(nn.Module):
    def __init__(self,d_model,num_heads, d_ff,dropout):
        super(EncoderLayer,self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForwardSubLayer(d_model,d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attn_output =  self.self_attn(x,x,x,mask)
        x = self.norm1(x +  self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x


In [14]:
# Transformer body: encoder

In [15]:
class TransformerEncoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_length):
        super(TransformerEncoder, self).__init__()
        self.embedding =  nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoder(d_model,max_sequence_length,)
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model,num_heads,d_ff,dropout) for _ in range(num_layers)]
        )
    
    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x,mask)
        return x



In [16]:
# Transformer Head

In [18]:
class ClassifierHead(nn.Module):
    def __init__(self, d_model,num_classes):
        super(ClassifierHead,self).__init__()
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x):
        logits = self.fc(x)
        return F.log_softmax(logits,dim =-1)

In [19]:
class RegressionHead(nn.Module):
    def __init__(self, d_model, output_dim):
        super(RegressionHead,self).__init__()
        self.fc = nn.Linear(d_model, output_dim)

    def forward(self, x):
        return self.fc(x)

In [29]:
num_classes = 3
vocab_size = 10000
batch_size=8
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
sequence_length = 64
dropout =  0.1

In [30]:
input_sequence = torch.randint(0, vocab_size, (batch_size, sequence_length))
mask = torch.randint(0, 2, (sequence_length, sequence_length))

# Instantiate the encoder transformer's body and head
encoder = TransformerEncoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_length=sequence_length)
classifier = ClassifierHead(d_model, num_classes)

# Complete the forward pass 
output = encoder(input_sequence, mask)
classification = classifier(output)
print("Classification outputs for a batch of ", batch_size, "sequences:")
print(classification)

Classification outputs for a batch of  8 sequences:
tensor([[[-0.7243, -1.5759, -1.1760],
         [-0.9598, -0.6743, -2.2301],
         [-0.4984, -1.4302, -1.8760],
         ...,
         [-0.4939, -1.4173, -1.9148],
         [-1.7739, -1.0527, -0.7312],
         [-0.8645, -0.9250, -1.7027]],

        [[-1.4416, -1.1453, -0.8090],
         [-1.0022, -0.8960, -1.4929],
         [-0.8281, -1.4940, -1.0828],
         ...,
         [-1.5381, -0.6752, -1.2867],
         [-1.0113, -1.7953, -0.7547],
         [-1.2732, -0.7470, -1.4013]],

        [[-0.4506, -1.5876, -1.8430],
         [-0.8690, -0.8793, -1.7983],
         [-0.7233, -1.3814, -1.3333],
         ...,
         [-0.9080, -2.1711, -0.7285],
         [-0.8205, -1.9861, -0.8614],
         [-1.2109, -2.5175, -0.4757]],

        ...,

        [[-0.7390, -1.4116, -1.2777],
         [-1.2300, -1.5466, -0.7037],
         [-0.8338, -1.0978, -1.4610],
         ...,
         [-1.0698, -1.2931, -0.9610],
         [-0.6958, -1.8967, -1.0462]

In [31]:
# Decoder Transformers

In [34]:
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_length):
        super(TransformerEncoder, self).__init__()
        self.embedding =  nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoder(d_model,max_sequence_length,)
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model,num_heads,d_ff,dropout) for _ in range(num_layers)]
        )
    
    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x,mask)
        return x

In [32]:
self_attention_mask = (1-torch.triu(
    torch.ones(1,sequence_length, sequence_length),
    diagonal=1)).bool()
'''
'''

output = decoder(input_sequence,self_attention_mask)

NameError: name 'decoder' is not defined

In [33]:
# Transformer body  (decoder) and head

In [35]:
class DecoderOnlyTransformer(nn.Module):
    def __init__(self,vocab_size,d_model, num_layers, num_heads,d_ff,dropout,max_sequence_length):
        super(TransformerDecoder, self ).__init__()
        self.embedding = nn.Embedding(vocab_size,d_model)
        self.positional_encoding = PositionalEncoder(d_model, max_sequence_length)
        self.layers = nn.ModuleList([DecoderLayer(d_model,num_heads,d_ff,dropout) for _ in range(num_layers)])
        self.fc = nn.Linear(d_model,vocab_size)

    def forward(self, x, self_mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        for layer in self.layers:
            x = layer(x,self_mask)
        x = self.fc(x)
        return F.log_softmax(x,dim=-1)


In [36]:
# implementing decoder

In [37]:
input_sequence = torch.randint(0, vocab_size, (batch_size, sequence_length))

# Create a triangular attention mask for causal attention
self_attention_mask = (1 - torch.triu(torch.ones(1, sequence_length, sequence_length), diagonal=1)).bool()  # Upper triangular mask

# Instantiate the decoder transformer
decoder = TransformerDecoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_length=sequence_length)

output = decoder(input_sequence, self_attention_mask)
print(output.shape)
print(output)

TypeError: super(type, obj): obj must be an instance or subtype of type

In [38]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model,num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        '''
        '''
    def forward(self, x,y,causal_maks,cross_mask):
        self_attn_output = self.self_attn(x,x,x,causal_maks)
        x = self.norm1(x+self.dropout(self_attn_output))

        cross_attn_output = self.cross_attn(x,y,y,cross_mask)
        x = self.norm2(x + self.dropout(cross_attn_output))
        ''' 
        '''

In [40]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        
        # Initialize the causal (masked) self-attention and cross-attention
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = FeedForwardSubLayer(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, causal_mask, encoder_output, cross_mask):
        # Pass the necessary arguments to the causal self-attention and cross-attention
        self_attn_output = self.self_attn(x, x, x, causal_mask)
        x = self.norm1(x + self.dropout(self_attn_output))
        cross_attn_output = self.cross_attn(x, encoder_output, encoder_output, cross_mask)
        x = self.norm2(x + self.dropout(cross_attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

In [41]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads,num_layers,d_ff,max_seq_len,dropout):
        super(Transformer, self).__init__()

        self.encoder = TransformerEncoder(vocab_size, d_model, num_heads,num_layers,
                                          num_heads, d_ff,max_seq_len, dropout)
        self.decoder = TransformerDecoder(vocab_size, d_model, num_heads, num_layers,
                                          num_heads, d_ff, max_seq_len,dropout)
        
    def forward(self, src, src_mask, causal_mask):
        encoder_output = self.encoder(src, src_mask)
        decoder_output = self.decoder(src, encoder_output, causal_mask, mask)

        return decoder_output


In [42]:
# Create a batch of random input sequences
input_sequence = torch.randint(0, vocab_size, (batch_size, sequence_length))
padding_mask = torch.randint(0, 2, (sequence_length, sequence_length))
causal_mask = torch.triu(torch.ones(sequence_length, sequence_length), diagonal=1)

# Instantiate the two transformer bodies
encoder = TransformerEncoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_length=sequence_length)
decoder = TransformerDecoder(vocab_size, d_model, num_layers, num_heads, d_ff, dropout, max_sequence_length=sequence_length)

# Pass the necessary masks as arguments to the encoder and the decoder
encoder_output = encoder(input_sequence, padding_mask)
decoder_output = decoder(input_sequence, causal_mask, encoder_output, padding_mask)
print("Batch's output shape: ", decoder_output.shape)

TypeError: super(type, obj): obj must be an instance or subtype of type

In [43]:
# TRANSFOMER ENCODER

In [44]:
# Initialize positional encoding layer and stack of EncoderLayer modules
class TransformerEncoder(nn.Module):
  
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_len, dropout):
        super(TransformerEncoder, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoder(d_model, max_seq_len)
        self.layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        x = self.embedding(x)
        x = self.positional_encoding(x)
        x = self.dropout(x)
        
        # Pass the sequence through each layer in the encoder
        for layer in self.layers:
            x = layer(x, mask)
        
        return x

class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_len, dropout):
        super(Transformer, self).__init__()
        # Initialize the encoder stack of the Transformer
        self.encoder = TransformerEncoder(vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_len, dropout)
        
    def forward(self, src, src_mask):
        encoder_output = self.encoder(src, src_mask)
        return encoder_output