In [1]:
## the structor is ResNet50 -> Transformation -> Position Encoding -> Transformer Encoder-> MLP head
## (B , T , 3 , 224 , 224) -> (B , T , 1024 , 14 , 14) -> (B, T , 2048, 7 , 7) 
## -> (B * T , 2048 , 1 , 1) 
## -> (B*T , 2048 ) -> (B*T , 256) -> (B , T , 256) -> (B , T , 256) -> (B , T , 256) -> (B , T , 256) -> (B*T , 256) 
## -> (B * T , 128) -> (B , T , 128)

In [157]:
import torch
import torchvision.models as models
import torch.nn as nn
import math
class ResNet50(nn.Module):
    """
    The resnet50 layer in the carl paper, the resnet is used to extract feature. We freeze all layers prior to -3 
        then train the following layers for our use.
    """
    def __init__(self):
        """
        Download the pretrain model.
        Specify layers to use
        """
        super().__init__()
        self.model = models.resnet50(pretrained=True)
        self.backbone = nn.Sequential(*list(self.model.children())[:-3])
        self.finetune_layer = nn.Sequential(*list(self.model.children())[-3])
        
        self.resnet_pool = nn.AdaptiveAvgPool2d(1)
    def forward(self,x):
        """
        Use Resnet 50 to extract per-frame features.
        -------
        Input:
            x: (B , T , 3 , 224 , 224)
        Output:
            out: (B , T , 2048) (The output dimension is the same as the -2 layer of ResNet, the only different
            is that we finetune layers between ResNet[-3:-1])
        """
        B , T , C , W , H = x.shape
        frames_per_batch = 25 ## Configuration of how many frames resnet can take once.
        num_blocks = int(math.ceil(float(T) / frames_per_batch))
        output = []
        for i in range(num_blocks): 
            ## make sure the boundary case is considered
            if (i+1)*frames_per_batch > T:
                processing = x[:, i*frames_per_batch:]
            else:
                processing = x[:, i*frames_per_batch:(i+1)*frames_per_batch]
            print(processing.shape)
            processing = processing.contiguous().view(-1,C,W,H)
            print(processing.shape)
            ## feed into ResNet
            self.backbone.eval()

            with torch.no_grad():
                resnet_frame = self.backbone(processing)
            ## append finetune part
            finetune_frame = self.finetune_layer(resnet_frame)
            
            processing = finetune_frame.contiguous().view(B,-1,2048,7,7)
            
            output.append(processing)
        x = torch.cat(output,dim=1)
        x = self.resnet_pool(x)
        x = x.flatten(start_dim=2)
        return x
            
r = ResNet50()

In [151]:
from torchvision.io import read_video
from torchvision.transforms import Resize,functional
x,_,_ = read_video("/home/c1l1mo/datasets/ACM_skating/Axel_and_Axel_com/467204328706015300.mp4")
y,_,_ = read_video("/home/c1l1mo/datasets/ACM_skating/Axel_and_Axel_com/467204328706015300.mp4")
x = torch.tensor(torch.cat((x.unsqueeze(0),y.unsqueeze(0)),dim=0)).float()
x = x[:,:,:224,:224]
x = x.permute(0,1,4,2,3)
x.shape

  """


torch.Size([2, 66, 3, 224, 224])

In [158]:
resnet_out = r(x)
print(resnet_out.shape)

torch.Size([2, 25, 3, 224, 224])
torch.Size([50, 3, 224, 224])
torch.Size([2, 25, 3, 224, 224])
torch.Size([50, 3, 224, 224])
torch.Size([2, 16, 3, 224, 224])
torch.Size([32, 3, 224, 224])
torch.Size([2, 66, 2048])


In [199]:
class Transformation(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc_layer = []
        
        inchannel = 2048
        for layer in range(2):
            self.fc_layer.append(nn.Dropout(0.1))
            self.fc_layer.append(nn.Linear(inchannel,512))
            self.fc_layer.append(nn.BatchNorm1d(512))
            self.fc_layer.append(nn.ReLU(True))
            inchannel = 512
        self.fc_layer = nn.Sequential(*self.fc_layer)
        
        self.video_emb = nn.Linear(512,256)
    def forward(self,x):
        B , T , R = x.shape
        x = x.view(-1,R)
        x = self.fc_layer(x)
        
        x = self.video_emb(x)
        x = x.view(B,T , x.shape[1])
        return x
    
t = Transformation()

In [200]:
transformed = t(resnet_out)
transformed.shape

torch.Size([2, 66, 256])

In [223]:
import numpy as np
class PositionalEncoder(nn.Module):
    def __init__(self):
        super(PositionalEncoder,self).__init__() ## There is NO DIFFERENCES b/t super() and super(className,self) AFTER python3
        self.drop_out = nn.Dropout(0.1)
    def generate_position_encoding(self,seq_len,d_model):
        """
        Position Encoding:
            Generate multiple seq, the dimesion will be (seq_len,d_model)
            For even number of dimension "d_model", generate sin wave.
            For odd numbers, generate cosine wave.
        """
        pos_matrix = np.zeros((seq_len,d_model))
        for pos in range(seq_len):
            for i in np.arange(d_model/2):
                pos_matrix[pos,int(2*i)] = np.sin(pos / 10000**(2*i / d_model))
                pos_matrix[pos,int(2*i)+1] = np.cos(pos / 10000**(2*i / d_model))
        return torch.from_numpy(pos_matrix).unsqueeze(0)
        
    def forward(self,x):
        B , T , D = x.shape
        pos_matrix = self.generate_position_encoding(T,D)
        print(x.shape,pos_matrix.shape)
        x = x + pos_matrix.type_as(x)
        x = self.drop_out(x)
        return x

PE = PositionalEncoder()
pe = PE(transformed)
pe.shape

torch.Size([2, 66, 256]) torch.Size([1, 66, 256])


torch.Size([2, 66, 256])

In [221]:
class EncodingLayer(nn.Module):
    """
    Transformer's encoding layer, in each layer it will do attention mechanism and feedforward
    """
    def __init__(self):
        super().__init__()