In [2]:
import torch
from torch import nn
from torchvision.models import resnet50
from torchvision.models.detection import FasterRCNN
import torch.nn.functional as F
import sys
sys.path.append('core')
from flownet import FlowNetS

import argparse
import os
import cv2
import glob
import numpy as np
import torch
from PIL import Image

from raft import RAFT
from utils import flow_viz
from utils.utils import InputPadder

In [3]:
DEVICE = 'cuda'

In [4]:
resnet = resnet50(pretrained = True).to(DEVICE)



In [15]:
def feature_warp(f_k : torch.Tensor, flow : torch.Tensor):
    n, c, h, w = f_k.shape
    kernel_size = 2
    f_i = torch.zeros_like(f_k)
    flo = - F.interpolate(flow, size=(h,w), mode='bilinear', align_corners=False)

    for px in range(w):
        for py in range(h):
            dpx = flo[:, 0:1, py, px]
            dpy = flo[:, 1:, py, px]
            i, j = torch.floor(py + dpy), torch.floor(px + dpx)
            di, dj = py + dpy - i, px + dpx - j
            G = torch.concat([di * dj, di * (1 - dj), (1 - di) * dj, (1 - di) * (1 - dj)], dim=1).reshape(n, 1, kernel_size, kernel_size)
            # n, c, kernel, kernel
            G = G.repeat(1, c, 1, 1).to(DEVICE)
            grid = torch.zeros(n, kernel_size, kernel_size, 2).to(DEVICE)
            for gy in range(kernel_size):
                for gx in range(kernel_size):
                    grid[:, gy, gx, 0:1] = 2 * (j + gx) / (w - 1) - 1
                    grid[:, gy, gx, 1:] = 2 * (i + gy) / (h - 1) - 1
            # n, c, kernel, kernel
            patch = F.grid_sample(f_k, grid,  mode='bilinear', padding_mode='zeros', align_corners=True)
            f_i[:,:, py, px] = torch.sum(G * patch, dim=(2, 3))

    return f_i

def feature_aggregation(frames : torch.Tensor, feature_encoder : nn.Module, flow_net : nn.Module, feature_embedding : nn.Module, K = 10):
    feature_maps = feature_encoder(frames)
    N, C, _, _ = feature_maps.shape
    f_i_aggregation_list = []
    for i in range(N):
        w_list = []
        f_list = []
        for j in range(max(0, i - K), min(N, i + K + 1)):
            pad_frames = torch.cat([frames[j:j+1], frames[i:i+1]], dim=1)
            flow_ji = flow_net(pad_frames)
            # 1, c, h, w
            f_ji = feature_warp(feature_maps[j:j+1], flow_ji)
            # 1, emb
            f_ji_emb, f_i_emb = feature_embedding(f_ji), feature_embedding(feature_maps[i:i+1])
            # 1, 1, 1, 1
            w_ji = torch.exp(torch.sum(f_ji_emb * f_i_emb) / (torch.norm(f_ji_emb, p = 2) *  torch.norm(f_i_emb, p = 2))).reshape(1, 1, 1, 1)
            # 1, c, 1, 1
            w_ji.repeat(1, C, 1, 1)
            f_list.append(f_ji)
            w_list.append(w_ji)
        # 2K, c, h, w
        f = torch.concatenate(f_list, dim=0)
        # 2K, c, 1, 1
        w = torch.concatenate(w_list, dim=0)
        # 1, c, h, w
        f_i_aggregation = torch.sum(f * w / torch.sum(w), dim = 0, keepdim=True)
        f_i_aggregation_list.append(f_i_aggregation)

    feature_map_aggregation = torch.concatenate(f_i_aggregation_list)
    return feature_map_aggregation


class FeatureExtractor(nn.Module):
    def __init__(self, model : nn.Module) -> None:
        super(FeatureExtractor, self).__init__()
        self.feature = nn.Sequential(*list(model.children())[:-2])

    def forward(self, x):
        x = self.feature(x)
        return x
    
class Feature2Class(nn.Module):
    def __init__(self, model : nn.Module) -> None:
        super(Feature2Class, self).__init__()
        self.avgpool = model.avgpool
        self.fc = model.fc
    
    def forward(self, x):
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

class FeatureEmbedding(nn.Module):
    def __init__(self, in_channels, out_channels) -> None:
        super(FeatureEmbedding, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, 512, kernel_size=1, stride=1, padding=0),
            nn.Conv2d(512, 512, kernel_size=3, stride=1, padding=1),
            nn.Conv2d(512, out_channels, kernel_size=1, stride=1, padding=0)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))

    def forward(self, x : torch.Tensor):
        x = self.conv(x)
        x = self.avgpool(x)
        x = x.squeeze(-1).squeeze(-1)
        return x

In [17]:
feature_encoder = FeatureExtractor(resnet)
feature_encoder.to(DEVICE)
feature_encoder.eval()
feature_embedding = FeatureEmbedding(2048, 2048)
feature_embedding.to(DEVICE)
feature_embedding.eval()
flow_net = FlowNetS(False).to(DEVICE)
flow_net.eval()

FlowNetS(
  (conv1): Sequential(
    (0): Conv2d(6, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (conv2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (conv3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
    (1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (conv3_1): Sequential(
    (0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (conv4): Sequential(
    (0): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (conv4_1): Sequential(
    (0): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (conv5): Sequential(
    (0): Conv2d(512, 512,

In [18]:
frames = torch.randn(5, 3, 224, 224).to(DEVICE)
f = feature_aggregation(frames, feature_encoder, flow_net, feature_embedding, K = 3)

In [24]:
def f():
    return 1,2,3

a = f()
a

(1, 2, 3)

In [25]:
tuple([1,2,3])

(1, 2, 3)

In [27]:
l1 = [1,2,3]
l2 = ['a', 'b', 'c']
for x, a in zip(l1, l2):
    print(a)

a
b
c
