In [2]:
# !pip install pytorch-lightning

Collecting pytorch-lightning
  Downloading pytorch_lightning-2.0.2-py3-none-any.whl (719 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m719.0/719.0 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Collecting torchmetrics>=0.7.0
  Downloading torchmetrics-0.11.4-py3-none-any.whl (519 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.2/519.2 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting lightning-utilities>=0.7.0
  Downloading lightning_utilities-0.8.0-py3-none-any.whl (20 kB)
Installing collected packages: lightning-utilities, torchmetrics, pytorch-lightning
Successfully installed lightning-utilities-0.8.0 pytorch-lightning-2.0.2 torchmetrics-0.11.4


In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

import pytorch_lightning as pl

  from .autonotebook import tqdm as notebook_tqdm


In [64]:

class MLP(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        
        self.linear1 = nn.Linear(d_model, d_model//4)
        self.layer_norm1 = nn.LayerNorm(d_model//4)
        self.linear2 = nn.Linear(d_model//4,  d_model//16)
        self.layer_norm2 = nn.LayerNorm(d_model//16)
        self.linear3 = nn.Linear(d_model//16, d_model//32)
        self.layer_norm3 = nn.LayerNorm(d_model//32)
        self.linear4 = nn.Linear(d_model//32, 4)

        self.dropout = nn.Dropout(dropout)




    def forward(self, x):

        x = self.layer_norm1(self.dropout(self.linear1(x)))
        x = self.layer_norm2(self.dropout(self.linear2(x)))
        x = self.layer_norm3(self.dropout(self.linear3(x)))
        x = self.linear4(x)

        return x


class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward, dropout=0.1):
        super().__init__()
        
        # embedding matching
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)

        # feedforward
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)


        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)

        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        # activation
        self.activation = nn.GELU()
        
    def forward(self, text_emb, box_emb):

        # https://arxiv.org/pdf/2002.04745.pdf
        # here we propose the original transformer encoder layer
        # however, we designed the architecture in this way
        # as the authors of the paper did to improve the convergence


        # Add & Norm
        text_emb = text_emb + self.dropout1(text_emb)
        text_emb = self.norm1(text_emb)

        box_emb = box_emb + self.dropout1(box_emb)
        box_emb = self.norm1(box_emb)

        # print(text_emb.shape, box_emb.shape)

        # embedding matching
        x , _ = self.self_attn(box_emb, text_emb, box_emb)

        print(x.shape, box_emb.shape)
        
        # Add & Norm
        x = box_emb + self.dropout2(x)
        x = self.norm2(x)

        # feedforward
        x = self.linear2(self.dropout(self.activation(self.linear1(x))))

        return x




class TransformerEncoder(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward, dropout=0.1):
        super().__init__()
        
        # self.encoder_block = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        # self.transformer_encoder = nn.TransformerEncoder(self.encoder_block, num_layers)

        self.transformer_encoder = [TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_layers)]


    def forward(self, text_emb, box_emb):

        # # matching between the text and the first box
        # x0 = self.transformer_encoder(text_emb, box_emb[:,0,:].unsqueeze(1))
        # # matching between the text and the second box
        # x1 = self.transformer_encoder(text_emb, box_emb[:,1,:].unsqueeze(1))
        
        # # concatenate the two boxes
        # # shape: (batch_size, 2, d_model)
        # x = torch.cat([x0, x1], axis=1)

        
        x0 = text_emb
        x1 = text_emb

        for layer in self.transformer_encoder:
            # matching between the text and the first box
            x0 = layer(x0, box_emb[:,0,:].unsqueeze(1))
            # matching between the text and the second box
            x1 = layer(x1, box_emb[:,1,:].unsqueeze(1))

        # concatenate the two boxes
        # shape: (batch_size, 2, d_model)
        x = torch.cat([x0, x1], axis=1)

        return x
    


class BoxRegressor(nn.Module):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward, dropout=0.1):
        super().__init__()
        
        self.transformer_encoder = TransformerEncoder(d_model, nhead, num_layers, dim_feedforward, dropout)
        self.mlp_regressor = MLP(1034, dropout)
        self.flatten = nn.Flatten(start_dim=1)


    def forward(self, text_encoding, box_encoding, box_coords):

        # compute the similarity matrix between the text and the boxes encoding
        similarity_matrix = torch.bmm(text_encoding.permute(0, 2, 1), box_encoding)

        # get the index of the top two boxes with the highest score
        top2_indices = torch.topk(similarity_matrix, k=2, dim=-1).indices.squeeze(1)
        top2 = torch.topk(similarity_matrix, k=2, dim=-1).indices.squeeze(1)


        # permute the dimensions to get the top two boxes
        box_encoding = box_encoding.permute(0, 2, 1)
        # get the top two boxes
        top2_boxes = box_encoding[torch.arange(box_encoding.shape[0]).unsqueeze(1), top2_indices]
        # print(top2_boxes.shape)

        # get the top two boxes coordinates
        top2_boxes_coords = box_coords[torch.arange(box_encoding.shape[0]).unsqueeze(1), top2_indices]
        # print(top2_boxes_coords.shape)

        # compute the matching between the text and the top two boxes
        out_matching = self.transformer_encoder(text_encoding.permute(0, 2, 1), top2_boxes)

        # concatenate the matching score with the top two boxes coordinates
        matching_score = torch.cat([top2.unsqueeze(2), top2_boxes_coords, out_matching], axis=-1)

        return self.mlp_regressor(self.flatten(matching_score))
    


class Net(pl.LightningModule):
    def __init__(self, d_model, nhead, num_layers, dim_feedforward, dropout=0.1):
        super().__init__()
        
        self.box_regressor = BoxRegressor(d_model, nhead, num_layers, dim_feedforward, dropout)

    def forward(self, text_encoding, box_encoding, box_coords):
        return self.box_regressor(text_encoding, box_encoding, box_coords)

    def training_step(self, batch, batch_idx):
        # training_step defined the train loop.
        # It is independent of forward
        text_encoding, box_encoding, box_coords, labels = batch
        out = self(text_encoding, box_encoding, box_coords)
        loss = F.mse_loss(out, labels)
        # Logging to TensorBoard by default
        self.log('train_loss', loss)
        return loss

    def validation_step(self, batch, batch_idx):
        text_encoding, box_encoding, box_coords, labels = batch
        out = self(text_encoding, box_encoding, box_coords)
        loss = F.mse_loss(out, labels)
        # Logging to TensorBoard by default
        self.log('val_loss', loss)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=1e-3)
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)
        return [optimizer], [scheduler]
        


        

import torch

torch.manual_seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

text_encoding = torch.randn(10000, 512, 1).to(device)
box_encoding = torch.randn(10000, 512, 10).to(device)
box_coords = torch.randn(10000, 10, 4).to(device)

target_boxes = torch.randn(10000, 1, 4).to(device)




# get dataset
from torch.utils.data import DataLoader, TensorDataset

batch_size = 256

dataset = TensorDataset(text_encoding, box_encoding, box_coords, target_boxes)
train_loader = DataLoader(dataset, batch_size=100, shuffle=True)


# init model
model = Net(512, 8, 2, 2048, 0.1).cuda()

# most basic trainer, uses good defaults
trainer = pl.Trainer(accelerator='auto', max_epochs=10)

# train the model
trainer.fit(model, train_loader)



# boxRegressor = BoxRegressor(512, 8, 2, 2048, 0.1)

# out = boxRegressor(text_encoding, box_encoding, box_coords)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
  rank_zero_warn(
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name          | Type         | Params
-----------------------------------------------
0 | box_regressor | BoxRegressor | 286 K 
-----------------------------------------------
286 K     Trainable params
0         Non-trainable params
286 K     Total params
1.146     Total estimated model params size (MB)
  rank_zero_warn(


Epoch 0:   0%|          | 0/100 [00:00<?, ?it/s] 

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:0 and cpu! (when checking argument for argument weight in method wrapper_CUDA__native_layer_norm)

In [39]:

# # compute the similarity matrix between the text and the boxes encoding
# similarity_matrix = torch.bmm(text_encoding.permute(0, 2, 1), box_encoding)
# print(similarity_matrix.shape)


# # get the index of the top two boxes with the highest score
# top2_indices = torch.topk(similarity_matrix, k=2, dim=-1).indices.squeeze(1)
# top2 = torch.topk(similarity_matrix, k=2, dim=-1).indices.squeeze(1)
# print(top2_indices.shape)

# # permute the dimensions to get the top two boxes
# box_encoding = box_encoding.permute(0, 2, 1)
# # get the top two boxes
# top2_boxes = box_encoding[torch.arange(box_encoding.shape[0]).unsqueeze(1), top2_indices]
# print(top2_boxes.shape)

# # get the top two boxes coordinates
# top2_boxes_coords = box_coords[torch.arange(box_encoding.shape[0]).unsqueeze(1), top2_indices]
# print(top2_boxes_coords.shape)



# # zip(top2_boxes_coords[:,0,:],top2_boxes_coords[:,1,:])


# # shape (torch.Size([64, 1, 512]), torch.Size([64, 1, 512]))
# # zip(top2_boxes[:,0,:], top2_boxes[:,1,:])
# # shape (torch.Size([64, 1, 512]), torch.Size([64, 1, 512]))
# # zip(text_encoding.permute(0, 2, 1), text_encoding.permute(0, 2, 1))



# print(out_matching.shape)


# # add one element to the last dimension
# # which encode the score of the box
# print(top2.unsqueeze(2).shape, out_matching.shape)



# mlp = MLP(517, 0.1)

# print(matching_score.shape)

# print(pred_box.shape)

# # for text_emb, box_emb in zip([_text_emb_[:,0,:], _box_emb_],[_text_emb_[:,1,:], _box_emb_]):

# # text_encoding.shape, top2_boxes.shape

torch.Size([64, 2, 512])