In [116]:
import torch as t
import torch.nn as nn
t.manual_seed(3407)

<torch._C.Generator at 0x78dc59378470>

# Creating dataset
### Dataset will be of size 19968*7 we will rearrange it to 312 * 64 * 7


In [117]:
Data = t.randint(0, 9, (19968,7))
Data = Data.reshape(312, 64, 7)

In [118]:
Data.shape

torch.Size([312, 64, 7])

In [119]:
train_X = Data[:250,:,:]
test_X = Data[251:,:,:]
train_X.shape, test_X.shape

(torch.Size([250, 64, 7]), torch.Size([61, 64, 7]))

In [120]:
sorted_y = t.sort(Data).values
reverse_y = t.flip(Data, [2])
sorted_y.shape, reverse_y.shape

(torch.Size([312, 64, 7]), torch.Size([312, 64, 7]))

In [121]:
Y_DATA = t.cat((sorted_y, reverse_y), 2)

In [122]:
train_y = Y_DATA[:250,:]
test_y = Y_DATA[251:,:] 

# Model Building

In [123]:
class Embedding(nn.Module):
    def __init__(self,embedding_dims):
        super(Embedding, self).__init__()
        self.emb = nn.Linear(1,embedding_dims)
    def forward(self, x):
        x = x.float() 
        x = x.unsqueeze(-1)
        return self.emb(x)

In [124]:
class MultiHeadAttention(nn.Module):
    def __init__(self, embedding_dims=512, heads=1, mask = False):
        super(MultiHeadAttention, self).__init__()
        self.embedding_dims = embedding_dims
        self.heads = heads
        self.feature_map = embedding_dims//heads
        self.mask = mask
        self.Wq = nn.Parameter(t.randn(embedding_dims, embedding_dims, dtype = t.float32)) # parameter class makes it learnable
        self.Wk = nn.Parameter(t.randn(embedding_dims, embedding_dims, dtype = t.float32)) # parameter class makes it learnable
        self.Wv = nn.Parameter(t.randn(embedding_dims, embedding_dims, dtype = t.float32)) # parameter class makes it learnable
    def forward(self,X:t.Tensor, enc_output):
        self.Xshape = X.shape
        # print(self.Xshape)
        self.Xq = X.clone()
        self.Xk = enc_output if enc_output is not None else X.clone()
        self.Xv = enc_output if enc_output is not None else X.clone()
        self.Q = t.matmul(self.Xq,self.Wq)
        self.K = t.matmul(self.Xk,self.Wk)
        self.V = t.matmul(self.Xv,self.Wv)
        # concatination
        self.Q = self.Q.view(self.Xshape[0], self.Xshape[1], self.heads, self.feature_map).transpose(1,2)
        self.K = self.K.view(self.Xshape[0], self.Xshape[1], self.heads, self.feature_map).transpose(1,2)
        self.V = self.V.view(self.Xshape[0], self.Xshape[1], self.heads, self.feature_map).transpose(1,2)
        return self.product()
    def product(self):
        product = t.matmul(self.Q, self.K.transpose(-2,-1))
        scale = t.sqrt(t.tensor(self.embedding_dims//self.heads))
        product = product/scale
        if self.mask:
            tril = t.tril(t.ones(self.Xshape[1], self.Xshape[1]))
            product = product.masked_fill(tril == 0, float('-inf'))
        product = t.nn.functional.softmax(product, dim=-1)
        contextual_embedding = t.matmul(product, self.V)
        contextual_embedding = contextual_embedding.transpose(1, 2).contiguous()
        output = contextual_embedding.view(self.Xshape[0], self.Xshape[1], self.embedding_dims)
        return output

In [125]:
#Self wrote
class PositionalEncoding():
    def __init__(self, embedding_dims = 512, context = 512):
        self.embedding_size = embedding_dims
        self.context = context
        self.matrix = t.zeros(self.context, self.embedding_size, dtype = t.float32)
        for pos in range(self.context):
            for i in range(self.embedding_size):
                if(i%2==0):
                    power = i/self.embedding_size
                    inner = pos/(10000**power)
                    self.matrix[pos][i] = t.sin(inner)
                else:
                    power = (i-1)/self.embedding_size
                    inner = pos/(10000**power)
                    self.matrix[pos][i]=t.cos(inner)
    def forward(self, x):
        return x+self.matrix[:x.shape[1], :]

In [126]:
#Ai Generated
import torch
import torch.nn as nn
import math

class PositionalEncoding(nn.Module):
    def __init__(self, embedding_dims=512, context=512):
        super().__init__()

        pe = torch.zeros(context, embedding_dims)
        position = torch.arange(0, context, dtype=torch.float32).unsqueeze(1)

        div_term = torch.exp(
            torch.arange(0, embedding_dims, 2, dtype=torch.float32)
            * (-math.log(10000.0) / embedding_dims)
        )

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  # shape: (1, context, embedding_dims)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x shape: (batch, seq_len, embedding_dims)
        # print("positional_shape: ",self.pe.shape)
        # print("x_shape: ", x.shape)
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len, :]


In [127]:
class FeedForwardNN(nn.Module):
    def __init__(self, embedding_dims):
        super(FeedForwardNN,self).__init__()
        self.z1 = nn.Linear(embedding_dims,512)
        self.z2 = nn.Linear(512,embedding_dims)
    def forward(self, x):
        x = self.z1(x)
        x = nn.ReLU()(x)
        x = self.z2(x)
        return x

In [128]:
d_model = 12
heads = 8
context = 7
class Encoder(nn.Module):
    def __init__(self, d_model, heads, context, first=False):
        super(Encoder, self).__init__()
        self.first = first
        self.embedding = Embedding(d_model)
        self.pos = PositionalEncoding(embedding_dims = d_model, context = 7)
        self.attention = MultiHeadAttention(embedding_dims=d_model, heads=heads, mask = False)
        self.feedforward = FeedForwardNN(embedding_dims=d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
    def forward(self, x):
        if(self.first):
            x = self.pos(self.embedding(x))
            # print(f"DEBUG - After Embedding: {x.shape}")
        # print(f"DEBUG - After Positional: {x.shape}")
        contextual_embedding = self.attention(x, None)
        # print(f"DEBUG - After Attention: {contextual_embedding.shape}")
        add = x + contextual_embedding
        norm = self.norm1(add)
        z = self.feedforward(norm)
        z = z + norm
        z = self.norm2(z)
        return z

In [129]:
class Decoder(nn.Module):
    def __init__(self, d_model, heads, context, first=False):
        super(Decoder, self).__init__()
        self.first = first
        self.embedding = Embedding(d_model)
        self.pos = PositionalEncoding(embedding_dims=d_model, context=context)
        self.masked_attention = MultiHeadAttention(embedding_dims=d_model, heads=heads, mask=True)
        self.norm1 = nn.LayerNorm(d_model)
        self.cross_attention = MultiHeadAttention(embedding_dims=d_model, heads=heads, mask=False)
        self.norm2 = nn.LayerNorm(d_model)
        self.feedforward = FeedForwardNN(embedding_dims=d_model)
        self.norm3 = nn.LayerNorm(d_model)

    def forward(self, x, encoder_output):
        if(self.first):
            x = self.pos(self.embedding(x))
        
        attn1_out = self.masked_attention(x, encoder_output)
        x = self.norm1(x + attn1_out)
        attn2_out = self.cross_attention_forward(x, encoder_output)
        x = self.norm2(x + attn2_out)
        ff_out = self.feedforward(x)
        z = self.norm3(x + ff_out)
        return z

    def cross_attention_forward(self, x, enc_out):
        return self.cross_attention(x, enc_out)

In [130]:
import torch
import torch.nn as nn

class prediction_layer(nn.Module):
    def __init__(self, input_dims):
        super(prediction_layer, self).__init__()
        self.linear = nn.Linear(input_dims, 140)

    def forward(self, x):
        raw_output = self.linear(x) 
        logits = raw_output.view(-1, 14, 10)
        return logits

In [131]:
class TransformerNN(nn.Module):
    def __init__(self):
        super(TransformerNN, self).__init__()
        self.en1 = Encoder(12, 1, 7, True)
        self.en2 = Encoder(12, 1, 7)
        self.en3 = Encoder(12, 1, 7)
        self.en4 = Encoder(12, 1, 7)
        self.en5 = Encoder(12, 1, 7)
        self.en6 = Encoder(12, 1, 7)
        self.en7 = Encoder(12, 1, 7)
        self.de1 = Decoder(12, 1, 7, True)
        self.de2 = Decoder(12, 1, 7)
        self.de3 = Decoder(12, 1, 7)
        self.de4 = Decoder(12, 1, 7)
        self.de5 = Decoder(12, 1, 7)
        self.de6 = Decoder(12, 1, 7)
        self.de7 = Decoder(12, 1, 7)
        self.pred_layer = prediction_layer(84)
    def forward(self, x):
        # enc_out = x
        enc_out = self.en1(x)
        # print('shape: ', x.shape)
        enc_out = self.en2(enc_out)
        enc_out = self.en3(enc_out)
        enc_out = self.en4(enc_out)
        enc_out = self.en5(enc_out)
        enc_out = self.en6(enc_out)
        enc_out = self.en7(enc_out)
        # print('enc_out.shape: ',enc_out.shape)
        dec_out = self.de1(x=x, encoder_output=enc_out)
        # print("de1: ", dec_out.shape)
        dec_out = self.de2(dec_out, enc_out)
        dec_out = self.de3(dec_out, enc_out)
        dec_out = self.de4(dec_out, enc_out)
        dec_out = self.de5(dec_out, enc_out)
        dec_out = self.de6(dec_out, enc_out)
        dec_out = self.de7(dec_out, enc_out)
        # print("decoder_ouput: ", dec_out.shape)
        dec_out = dec_out.reshape(64,-1)
        y_pred = self.pred_layer(dec_out)
        return y_pred

# Training


In [132]:
model = TransformerNN()
criterion = nn.CrossEntropyLoss()
optimizer = t.optim.Adam(model.parameters(), 0.0001)

In [133]:
# epochs = 10
# for epoch in range(epochs):
#     for x,y in zip(train_X,train_y):
#         # x = x.to(dtype=t.float32)
#         # y = y.to(dtype=t.float32)
#         # print(x.shape)
#         logits = model(x)
#         # print(logits.shape)
#         # print(y.shape)
#         logits = logits.view(-1,10)
#         # print("logits type:- ", logits.dtype)
#         targets = y.view(-1)
#         # print("targets type:- ", targets.dtype)
#         loss = criterion(logits, targets)
#         # optimizer.backward()
#         model.zero_grad()
#         loss.backward()
#         optimizer.step()
#     print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, 10, loss.item()))

In [134]:
epochs = 10
for epoch in range(epochs):
    running_loss = 0.0  # <--- Step 1: Initialize a counter
    
    for x, y in zip(train_X, train_y):
        optimizer.zero_grad()        # <--- Standard practice: zero first
        
        logits = model(x).view(-1, 10)
        targets = y.view(-1)
        
        loss = criterion(logits, targets)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()  # <--- Step 2: Add up the loss
        
    # Step 3: Calculate average (Divide by total number of items)
    avg_loss = running_loss / len(train_X)
    print('Epoch [{}/{}], Avg Loss: {:.4f}'.format(epoch+1, 10, avg_loss))

Epoch [1/10], Avg Loss: 2.0159
Epoch [2/10], Avg Loss: 1.7448
Epoch [3/10], Avg Loss: 1.6436
Epoch [4/10], Avg Loss: 1.5699
Epoch [5/10], Avg Loss: 1.5092
Epoch [6/10], Avg Loss: 1.4577
Epoch [7/10], Avg Loss: 1.4078
Epoch [8/10], Avg Loss: 1.3718
Epoch [9/10], Avg Loss: 1.3391
Epoch [10/10], Avg Loss: 1.3156


In [135]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params}")


Total parameters: 201516
