In [1]:
import sys
import os

# Add the path to the parent module
sys.path.append(os.path.abspath('../..'))

import warnings
warnings.filterwarnings('ignore')


import deeppy as dp

import torch
import torch.optim as optim
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


import numpy as np
import matplotlib.pyplot as plt

import tiktoken
from datasets import load_dataset


from deeppy import LearnFrame,LayerGenerator,FromLoader
from deeppy import Network
from deeppy.models.cv import Sane


In [2]:
batch_size = 32
input_dim = 20
embed_dim = 64
latent_dim = 128
num_heads = 4
num_layers = 4
context_size = 15
dropout = 0.1
bias = False
projection_dim = 10

In [3]:
Optimizer_params = {
    "optimizer":optim.AdamW,
    "optimizer_args":{"lr":3e-4, "amsgrad" : True},
    "clipper":nn.utils.clip_grad_norm_,
    "clipper_params":{"max_norm" : 1.0},
    "scheduler_params":None,
}

Sane_params = {
    "optimizer_params":Optimizer_params,
    "max_positions" : [500,500,500],
    "input_dim":input_dim,
    "latent_dim":latent_dim,
    "projection_dim" : projection_dim,
    "embed_dim":embed_dim,
    "num_heads":num_heads,
    "num_layers":num_layers,
    "context_size":context_size,
    "dropout":dropout,
    "bias" : bias,
    "device":device,
    "gamma" : 0.5,
    "ntx_temp" : 0.1

}

model = dp.cv.Sane(**Sane_params)

# Inputs

In [4]:
cout = context_size
cr = input_dim

tokenized_input = torch.rand(size = (batch_size, cout, cr)).to(device)
mask = torch.log(torch.randint(0,2,size = (batch_size*num_heads, cout, cout))).to(device)
positions = torch.randint(0,500, size = (batch_size,cout,3)).to(device)

tokenized_input2 = torch.rand(size = (batch_size, cout, cr)).to(device)
mask2 = torch.log(torch.randint(0,2,size = (batch_size*num_heads, cout, cout))).to(device)
positions2 = torch.randint(0,500, size = (batch_size,cout,3)).to(device)

print("Assume that layer of a NN is already flattened and the following tensor\n is batch_size x cout x cr")


print(f"Inp shape : {tokenized_input.shape}")
print(f"mask shape : {mask.shape}")
print(f"positions shape : {positions.shape}")

Assume that layer of a NN is already flattened and the following tensor
 is batch_size x cout x cr
Inp shape : torch.Size([32, 15, 20])
mask shape : torch.Size([128, 15, 15])
positions shape : torch.Size([32, 15, 3])


# Autoencoder

In [5]:
model.autoencoder

Network(
  (model): Sequential(
    (0): Linear(in_features=20, out_features=64, bias=True)
    (1): SanePositionalEmbedding(
      (pe1): Embedding(500, 32)
      (pe2): Embedding(500, 32)
      (pe3): Embedding(500, 32)
    )
    (2): Dropout(p=0.1, inplace=False)
    (3): TransformerEncoder(
      (layers): ModuleList(
        (0-3): 4 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=False)
          )
          (linear1): Linear(in_features=64, out_features=256, bias=False)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=256, out_features=64, bias=False)
          (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
          (activation): GELU(

## Encode

In [6]:
latent = model.encode((tokenized_input,positions, mask))
print(f"Latent space : {latent.shape}")

Latent space : torch.Size([32, 15, 128])


In [7]:
T = model.autoencoder.model[0](tokenized_input)
print(f"Tokenized input : {T.shape}")

Tp = model.autoencoder.model[1](T,positions)
Tp = model.autoencoder.model[2](Tp)
print(f"Position encoding + dropout : {Tp.shape}")

Tr = model.autoencoder.model[3](Tp, mask)
print(f"After transformer encoder : {Tr.shape}")

latent = model.autoencoder.model[4](Tp)
print(f"Latent space : {latent.shape}")

Tokenized input : torch.Size([32, 15, 64])
Position encoding + dropout : torch.Size([32, 15, 64])
After transformer encoder : torch.Size([32, 15, 64])
Latent space : torch.Size([32, 15, 128])


## Decode

In [8]:
z = model.decode((latent,positions,mask))
print(f"Output : {z.shape}")

Output : torch.Size([32, 15, 20])


In [9]:
T = model.autoencoder.model[5](latent)
print(f"Decoder compression : {T.shape}")

T = model.autoencoder.model[6](T,positions)
T = model.autoencoder.model[7](T)
print(f"Decoder position encoding + dropout : {T.shape}")

T = model.autoencoder.model[8](T,mask)
print(f"Decoder transformer : {T.shape}")

z = model.autoencoder.model[9](T)
print(f"Output : {z.shape}")

Decoder compression : torch.Size([32, 15, 64])
Decoder position encoding + dropout : torch.Size([32, 15, 64])
Decoder transformer : torch.Size([32, 15, 64])
Output : torch.Size([32, 15, 20])


## Autoencoder Full Pass

In [10]:
z,y, zp = model((tokenized_input,positions,mask))
print(f"Output : {y.shape}")

Output : torch.Size([32, 15, 20])


# Projection Head

In [11]:
model.project

Network(
  (model): Sequential(
    (0): SqueezeLastDimention()
    (1): Linear(in_features=1920, out_features=10, bias=False)
    (2): LayerNorm((10,), eps=1e-05, elementwise_affine=True)
    (3): ReLU()
    (4): Linear(in_features=10, out_features=10, bias=False)
    (5): LayerNorm((10,), eps=1e-05, elementwise_affine=True)
    (6): ReLU()
  )
)

In [12]:
latent = model.encode((tokenized_input,positions,mask))
p = model.project(latent)
print(f"Latent size : {latent.shape}")
print(f"Projection head output size : {p.shape}")

Latent size : torch.Size([32, 15, 128])
Projection head output size : torch.Size([32, 10])


# Train a Batch

In [13]:
mask = torch.randint(0,2, size = tokenized_input.shape)
mask2 = torch.randint(0,2, size = tokenized_input.shape)

model.train()
batch = (tokenized_input,positions,mask,tokenized_input2,positions2,mask2)
loss = model.optimize(batch)
print(f"Loss train : {loss}")

Loss train : 3.219444751739502


# Test a Batch

In [14]:
mask = torch.randint(0,2, size = tokenized_input.shape)
mask2 = torch.randint(0,2, size = tokenized_input.shape)

model.eval()
batch = (tokenized_input,positions,mask,tokenized_input2,positions2,mask2)
loss = model.test(batch)
print(f"Loss train : {loss}")

Loss train : 2.420185089111328
