In [None]:
!unzip /content/drive/MyDrive/dsg-bh-2y/CholecT50_Subset_Dsg.zip -d /content/drive/MyDrive/CholecT50_unzipped

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000361.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000290.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000853.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/001085.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000180.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000777.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000280.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000514.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000829.png  
  inflating: /content/drive/MyDrive/CholecT50_unzipped/CholecT50/videos/VID06/000127.png  
  inflating: /content/dri

In [None]:
!cp -r /content/dataset /content/drive/MyDrive

## Dataloader

In [None]:
import os
import random
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset, ConcatDataset, DataLoader

In [None]:
class T50(Dataset):
    def __init__(self, img_dir, triplet_file, tool_file, verb_file, target_file, phase_file, transform=None, target_transform=None):
        self.triplet_labels = np.loadtxt(triplet_file, dtype=int, delimiter=',')
        self.tool_labels = np.loadtxt(tool_file, dtype=int, delimiter=',')
        self.verb_labels = np.loadtxt(verb_file, dtype=int, delimiter=',')
        self.target_labels = np.loadtxt(target_file, dtype=int, delimiter=',')
        self.phase_labels = np.loadtxt(phase_file, dtype=int, delimiter=',')
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.triplet_labels)

    def __getitem__(self, index):
        triplet_label = self.triplet_labels[index, 1:]
        tool_label = self.tool_labels[index, 1:]
        verb_label = self.verb_labels[index, 1:]
        target_label = self.target_labels[index, 1:]
        phase_label = self.phase_labels[index, 1:]
        basename = "{}.png".format(str(self.triplet_labels[index, 0]).zfill(6))
        img_path = os.path.join(self.img_dir, basename)
        image    = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            triplet_label = self.target_transform(triplet_label)
        return image, (tool_label, verb_label, target_label, triplet_label, phase_label)

In [None]:
class CholecT50():
    def __init__(self,
                dataset_dir):
      self.dataset_dir = dataset_dir
      train_videos = [1, 2, 4, 5, 6, 8, 10, 12, 13, 14]
      test_videos = [92, 96, 103, 110, 111]
      self.train_records = ['VID{}'.format(str(v).zfill(2)) for v in train_videos]
      self.test_records  = ['VID{}'.format(str(v).zfill(2)) for v in test_videos]
      trainform, testform = self.transform()
      self.build_train_dataset(trainform)
      self.build_test_dataset(testform)

    def no_augumentation(self, x):
      return x

    def transform(self):
      normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      op_test   = [transforms.Resize((256, 448)), transforms.ToTensor(), normalize]
      op_train  = [transforms.Resize((256, 448)), transforms.ToTensor(), normalize]
      testform  = transforms.Compose(op_test)
      trainform = transforms.Compose(op_train)
      return trainform, testform

    def build_train_dataset(self, transform):
        iterable_dataset = []
        for video in self.train_records:
            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'data', video),
                        triplet_file = os.path.join(self.dataset_dir, 'triplet', '{}.txt'.format(video)),
                        tool_file = os.path.join(self.dataset_dir, 'instrument', '{}.txt'.format(video)),
                        verb_file = os.path.join(self.dataset_dir, 'verb', '{}.txt'.format(video)),
                        target_file = os.path.join(self.dataset_dir, 'target', '{}.txt'.format(video)),
                        phase_file =  os.path.join(self.dataset_dir, 'phase', '{}.txt'.format(video)),
                        transform=transform)
            iterable_dataset.append(dataset)
        self.train_dataset = iterable_dataset

    def build_test_dataset(self, transform):
        iterable_dataset = []
        for video in self.test_records:
            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'data', video),
                triplet_file = os.path.join(self.dataset_dir, 'triplet', '{}.txt'.format(video)),
                tool_file = os.path.join(self.dataset_dir, 'instrument', '{}.txt'.format(video)),
                verb_file = os.path.join(self.dataset_dir, 'verb', '{}.txt'.format(video)),
                target_file = os.path.join(self.dataset_dir, 'target', '{}.txt'.format(video)),
                phase_file = os.path.join(self.dataset_dir, 'phase', '{}.txt'.format(video)),
                transform=transform)
            iterable_dataset.append(dataset)
        self.test_dataset = iterable_dataset

    def build(self):
        return (self.train_dataset, self.test_dataset)

## Architecture

In [4]:
import os
import torch
import numpy as np
from torch import nn
import torch.nn.functional as F
import torchvision.models as basemodels
import torchvision.transforms as transforms

In [5]:
OUT_HEIGHT = 8
OUT_WIDTH  = 14

In [6]:
# Feature extraction backbone
class BaseModel(nn.Module):
    def __init__(self, hr_output=False, *args):
        super(BaseModel, self).__init__(*args)
        self.output_feature = {}
        self.basemodel = basemodels.resnet18(pretrained=True)
        if hr_output: self.increase_resolution()
        self.basemodel.layer1[1].bn2.register_forward_hook(self.get_activation('low_level_feature'))
        self.basemodel.layer4[1].bn2.register_forward_hook(self.get_activation('high_level_feature'))

    def increase_resolution(self):
        global OUT_HEIGHT, OUT_WIDTH
        self.basemodel.layer3[0].conv1.stride = (1,1)
        self.basemodel.layer3[0].downsample[0].stride=(1,1)
        self.basemodel.layer4[0].conv1.stride = (1,1)
        self.basemodel.layer4[0].downsample[0].stride=(1,1)
        OUT_HEIGHT *= 4
        OUT_WIDTH  *= 4
        print("using high resolution output ({}x{})".format(OUT_HEIGHT,OUT_WIDTH))


    def get_activation(self, layer_name):
        def hook(module, input, output):
            self.output_feature[layer_name] = output
        return hook

    def forward(self, x):
        _ = self.basemodel(x)
        return self.output_feature['high_level_feature'], self.output_feature['low_level_feature']


In [7]:
# Tool Weakly-Supervised localization
class Tool_WSL(nn.Module):
    def __init__(self, num_class, depth=64):
        super(Tool_WSL, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=512, out_channels=depth, kernel_size=3, padding=1)
        self.cam   = nn.Conv2d(in_channels=depth, out_channels=num_class, kernel_size=1)
        self.elu   = nn.ELU()
        self.bn    = nn.BatchNorm2d(depth)
        self.gmp   = nn.AdaptiveMaxPool2d((1,1))

    def forward(self, x):
        feature = self.conv1(x)
        feature = self.bn(feature)
        feature = self.elu(feature)
        cam     = self.cam(feature)
        logits  = self.gmp(cam).squeeze(-1).squeeze(-1)
        return cam, logits

In [8]:
class ConvLSTMCell(nn.Module):
    def __init__(self, input_channels, hidden_channels, kernel_size):
        super(ConvLSTMCell, self).__init__()
        padding = kernel_size // 2  # to ensure the same spatial dimensions

        self.input_channels = input_channels
        self.hidden_channels = hidden_channels
        self.kernel_size = kernel_size

        self.conv = nn.Conv2d(input_channels + hidden_channels,
                              4 * hidden_channels,
                              kernel_size,
                              padding=padding)

    def forward(self, x, h_cur, c_cur):
        combined = torch.cat([x, h_cur], dim=1)  # concatenate along the channel axis
        conv_output = self.conv(combined)
        (cc_i, cc_f, cc_o, cc_g) = torch.split(conv_output, self.hidden_channels, dim=1)

        i = torch.sigmoid(cc_i)
        f = torch.sigmoid(cc_f)
        o = torch.sigmoid(cc_o)
        g = torch.tanh(cc_g)

        c_next = f * c_cur + i * g
        h_next = o * torch.tanh(c_next)

        return h_next, c_next

class ConvLSTM2d(nn.Module):
    def __init__(self, input_channels, hidden_channels, kernel_size, num_layers, batch_first=False):
        super(ConvLSTM2d, self).__init__()
        self.num_layers = num_layers
        self.batch_first = batch_first

        self.cells = nn.ModuleList()
        self.cells.append(ConvLSTMCell(input_channels, hidden_channels, kernel_size))
        for _ in range(1, num_layers):
            self.cells.append(ConvLSTMCell(hidden_channels, hidden_channels, kernel_size))

    def forward(self, x):
        if self.batch_first:
            x = x.permute(1, 0, 2, 3, 4)  # Change to (seq_len, batch, channels, height, width)

        _, b, _, h, w = x.shape
        h_t, c_t = [], []

        for i in range(self.num_layers):
            h_t.append(torch.zeros(b, self.cells[i].hidden_channels, h, w, device=x.device))
            c_t.append(torch.zeros(b, self.cells[i].hidden_channels, h, w, device=x.device))

        output_inner = []
        for t in range(x.size(0)):
            x_t = x[t, :, :, :, :]
            for i in range(self.num_layers):
                h_t[i], c_t[i] = self.cells[i](x_t, h_t[i], c_t[i])
                x_t = h_t[i]
            output_inner.append(h_t[-1])

        output_inner = torch.stack(output_inner, dim=0)

        if self.batch_first:
            output_inner = output_inner.permute(1, 0, 2, 3, 4)  # Back to (batch, seq_len, channels, height, width)

        return output_inner, (h_t[-1], c_t[-1])

class LSTMblock(nn.Module):
    def __init__(self, hidden_channels):
        super(LSTMblock, self).__init__()
        self.conv_lstm = ConvLSTM2d(input_channels=512, hidden_channels=hidden_channels, kernel_size=3, num_layers=2, batch_first=True)
        self.bn = nn.BatchNorm2d(hidden_channels)
        self.activation = nn.ELU()

    def forward(self, x):
        lstm_out, _ = self.conv_lstm(x)
        lstm_out = lstm_out[:, -1, :, :, :]  # Take the output of the last time step i.e the present time step
        lstm_out = self.bn(lstm_out)
        lstm_out = self.activation(lstm_out)
        return lstm_out


In [9]:
# Phase Weakly-Supervised localization
class Phase_WSL(nn.Module):
    def __init__(self, num_phase, in_channels, depth=64):
        super(Phase_WSL, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channels, out_channels=depth, kernel_size=3, padding=1)
        self.cam   = nn.Conv2d(in_channels=depth, out_channels=num_phase, kernel_size=1)
        self.verb_phase_cam= nn.Conv2d(in_channels=num_phase, out_channels=10, kernel_size=1)
        self.triplet_cam= nn.Conv2d(in_channels=depth, out_channels=100, kernel_size=1)
        self.elu   = nn.ELU()
        self.bn1    = nn.BatchNorm2d(depth)
        self.bn2    = nn.BatchNorm2d(num_phase)
        self.bn3    = nn.BatchNorm2d(100)
        self.gmp   = nn.AdaptiveMaxPool2d((1,1))

    def forward(self, x):
        feature = self.conv1(x)
        feature = self.bn1(feature)
        feature = self.elu(feature)
        triplet_cam = self.triplet_cam(feature)
        triplet_cam = self.bn3(triplet_cam)
        triplet_cam = self.elu(triplet_cam)
        cam     = self.cam(feature)
        logits_expanded  = self.gmp(cam)
        logits  = logits_expanded.squeeze(-1).squeeze(-1)
        cam     = self.bn2(cam)
        cam     = self.elu(cam)
        cam     = cam * logits_expanded
        verb_phase_cam=self.verb_phase_cam(cam)
        return triplet_cam, logits, verb_phase_cam

In [10]:
# Class Activation Guided Attention Mechanism
class CAGAM(nn.Module):
    def __init__(self, num_tool, num_verb, num_target, in_depth=512):
        super(CAGAM, self).__init__()
        out_depth               = num_tool
        self.verb_context       = nn.Conv2d(in_channels=in_depth, out_channels=out_depth, kernel_size=3, padding=1)
        self.verb_query         = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.verb_tool_query    = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.verb_key           = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.verb_tool_key      = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.verb_tool_cmap     = nn.Conv2d(in_channels=out_depth, out_channels=num_verb, kernel_size=1)
        self.verb_cmap          = nn.Conv2d(in_channels=num_verb, out_channels=num_verb, kernel_size=1)
        self.target_context     = nn.Conv2d(in_channels=in_depth, out_channels=out_depth, kernel_size=3, padding=1)
        self.target_query       = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.target_tool_query  = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.target_key         = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.target_tool_key    = nn.Conv2d(in_channels=out_depth, out_channels=out_depth, kernel_size=1)
        self.target_cmap        = nn.Conv2d(in_channels=out_depth, out_channels=num_target, kernel_size=1)
        self.gmp       = nn.AdaptiveMaxPool2d((1,1))
        self.elu       = nn.ELU()
        self.soft      = nn.Softmax(dim=1)
        self.flat      = nn.Flatten(2,3)
        self.bn1       = nn.BatchNorm2d(out_depth)
        self.bn2       = nn.BatchNorm2d(out_depth)
        self.bn3       = nn.BatchNorm2d(out_depth)
        self.bn4       = nn.BatchNorm2d(out_depth)
        self.bn5       = nn.BatchNorm2d(out_depth)
        self.bn6       = nn.BatchNorm2d(out_depth)
        self.bn7       = nn.BatchNorm2d(out_depth)
        self.bn8       = nn.BatchNorm2d(out_depth)
        self.bn9       = nn.BatchNorm2d(out_depth)
        self.bn10      = nn.BatchNorm2d(out_depth)
        self.bn11      = nn.BatchNorm2d(out_depth)
        self.bn12      = nn.BatchNorm2d(out_depth)
        self.bn13      = nn.BatchNorm2d(num_verb)
        self.encoder_cagam_verb_beta   = torch.nn.Parameter(torch.randn(1))
        self.encoder_cagam_target_beta = torch.nn.Parameter(torch.randn(1))

    def get_verb(self, raw, cam, phase_cam):
        x  = self.elu(self.bn1(self.verb_context(raw)))
        z  = x.clone()
        sh = list(z.shape)
        sh[0] = -1
        q1 = self.elu(self.bn2(self.verb_query(x)))
        k1 = self.elu(self.bn3(self.verb_key(x)))
        w1 = self.flat(k1).matmul(self.flat(q1).transpose(-1,-2))
        q2 = self.elu(self.bn4(self.verb_tool_query(cam)))
        k2 = self.elu(self.bn5(self.verb_tool_key(cam)))
        w2 = self.flat(k2).matmul(self.flat(q2).transpose(-1,-2))
        attention = (w1 * w2) / torch.sqrt(torch.tensor(sh[-1], dtype=torch.float32))
        attention = self.soft(attention)
        v = self.flat(z)
        e = (attention.matmul(v) * self.encoder_cagam_verb_beta).reshape(sh)
        e = self.bn6(e + z)
        #e = e * phase_cam# If error occurs try printing shapes of e and phase_cam
        verb_tool_cmap = self.verb_tool_cmap(e)
        f = verb_tool_cmap * phase_cam
        f = self.bn13(f)
        f = self.elu(f)
        cmap = self.verb_cmap(f)
        y = self.gmp(cmap).squeeze(-1).squeeze(-1)
        return cmap, y

    def get_target(self, raw, cam):
        x  = self.elu(self.bn7(self.target_context(raw)))
        z  = x.clone()
        sh = list(z.shape)
        sh[0] = -1
        q1 = self.elu(self.bn8(self.target_query(x)))
        k1 = self.elu(self.bn9(self.target_key(x)))
        w1 = self.flat(k1).transpose(-1,-2).matmul(self.flat(q1))
        q2 = self.elu(self.bn10(self.target_tool_query(cam)))
        k2 = self.elu(self.bn11(self.target_tool_key(cam)))
        w2 = self.flat(k2).transpose(-1,-2).matmul(self.flat(q2))
        attention = (w1 * w2) / torch.sqrt(torch.tensor(sh[-1], dtype=torch.float32))
        attention = self.soft(attention)
        v = self.flat(z)
        e = (v.matmul(attention) * self.encoder_cagam_target_beta).reshape(sh)
        e = self.bn12(e + z)
        cmap = self.target_cmap(e)
        y = self.gmp(cmap).squeeze(-1).squeeze(-1)
        return cmap, y

    def forward(self, x, cam, phase_cam):
        cam_v, logit_v = self.get_verb(x, cam, phase_cam)
        cam_t, logit_t = self.get_target(x, cam)
        return (cam_v, logit_v), (cam_t, logit_t)

In [11]:
# Projection function
class Projection(nn.Module):
    def __init__(self, num_tool=6, num_verb=10, num_target=15, num_class=100, out_depth=128):
        super(Projection, self).__init__()
        self.triplet_value = nn.Conv2d(in_channels=num_class, out_channels=out_depth, kernel_size=1)
        self.i_value   = nn.Conv2d(in_channels=num_tool, out_channels=out_depth, kernel_size=1)
        self.v_value   = nn.Conv2d(in_channels=num_verb, out_channels=out_depth, kernel_size=1)
        self.t_value   = nn.Conv2d(in_channels=num_target, out_channels=out_depth, kernel_size=1)
        self.triplet_query = nn.Linear(in_features=num_class, out_features=out_depth)
        self.dropout   = nn.Dropout(p=0.3)
        self.triplet_key   = nn.Linear(in_features=num_class, out_features=out_depth)
        self.i_key     = nn.Linear(in_features=num_tool, out_features=out_depth)
        self.v_key     = nn.Linear(in_features=num_verb, out_features=out_depth)
        self.t_key     = nn.Linear(in_features=num_target, out_features=out_depth)
        self.gap       = nn.AdaptiveAvgPool2d((1,1))
        self.elu       = nn.ELU()
        self.bn1       = nn.BatchNorm1d(out_depth)
        self.bn2       = nn.BatchNorm1d(out_depth)
        self.bn3       = nn.BatchNorm2d(out_depth)
        self.bn4       = nn.BatchNorm1d(out_depth)
        self.bn5       = nn.BatchNorm2d(out_depth)
        self.bn6       = nn.BatchNorm1d(out_depth)
        self.bn7       = nn.BatchNorm2d(out_depth)
        self.bn8       = nn.BatchNorm1d(out_depth)
        self.bn9       = nn.BatchNorm2d(out_depth)

    def forward(self, cam_i, cam_v, cam_t, X):
        q = self.elu(self.bn1(self.triplet_query(self.dropout(self.gap(X).squeeze(-1).squeeze(-1)))))
        k = self.elu(self.bn2(self.triplet_key(self.gap(X).squeeze(-1).squeeze(-1))) )
        v = self.bn3(self.triplet_value(X))
        k1 = self.elu(self.bn4(self.i_key(self.gap(cam_i).squeeze(-1).squeeze(-1))) )
        v1 = self.elu(self.bn5(self.i_value(cam_i)) )
        k2 = self.elu(self.bn6(self.v_key(self.gap(cam_v).squeeze(-1).squeeze(-1))))
        v2 = self.elu(self.bn7(self.v_value(cam_v)) )
        k3 = self.elu(self.bn8(self.t_key(self.gap(cam_t).squeeze(-1).squeeze(-1))))
        v3 = self.elu(self.bn9(self.t_value(cam_t)))
        sh = list(v1.shape)
        v  = self.elu(F.interpolate(v, (sh[2],sh[3])))
        X  = self.elu(F.interpolate(X, (sh[2],sh[3])))
        return (X, (k1,v1), (k2,v2), (k3,v3), (q,k,v))

In [12]:
# Multi-head of self and cross attention
class MHMA(nn.Module):
    def __init__(self, depth, num_class=100, num_heads=4, use_ln=False):
        super(MHMA, self).__init__()
        self.concat = nn.Conv2d(in_channels=depth*num_heads, out_channels=num_class, kernel_size=3, padding=1)
        self.bn     = nn.BatchNorm2d(num_class)
        self.ln     = nn.LayerNorm([num_class, OUT_HEIGHT, OUT_WIDTH]) if use_ln else nn.BatchNorm2d(num_class)
        self.elu    = nn.ELU()
        self.soft   = nn.Softmax(dim=1)
        self.heads  = num_heads

    def scale_dot_product(self, key, value, query):
        dk        = torch.sqrt(torch.tensor(list(key.shape)[-2], dtype=torch.float32))
        affinity  = key.matmul(query.transpose(-1,-2))
        attn_w    = affinity / dk
        attn_w    = self.soft(attn_w)
        attention = attn_w.matmul(value)
        return attention

    def forward(self, inputs):
        (X, (k1,v1), (k2,v2), (k3,v3), (q,k,v)) = inputs
        query = torch.stack([q]*self.heads, dim=1) # [B,Head,D]
        query = query.unsqueeze(dim=-1) # [B,Head,D,1]
        key   = torch.stack([k,k1,k2,k3], dim=1) # [B,Head,D]
        key   = key.unsqueeze(dim=-1) # [B,Head,D,1]
        value = torch.stack([v,v1,v2,v3], dim=1) # [B,Head,D,H,W]
        dims  = list(value.shape) # [B,Head,D,H,W]
        value = value.reshape([-1,dims[1],dims[2],dims[3]*dims[4]])# [B,Head,D,HW]
        attn  = self.scale_dot_product(key, value, query)  # [B,Head,D,HW]
        attn  = attn.reshape([-1,dims[1]*dims[2],dims[3],dims[4]]) # [B,DHead,H,W]
        mha   = self.elu(self.bn(self.concat(attn)))
        mha   = self.ln(mha + X.clone())
        return mha

In [13]:
# Feed-forward layer
class FFN(nn.Module):
    def __init__(self, k, num_class=100, use_ln=False):
        super(FFN, self).__init__()
        def Ignore(x): return x
        self.conv1 = nn.Conv2d(in_channels=num_class, out_channels=num_class, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=num_class, out_channels=num_class, kernel_size=1)
        self.elu1  = nn.ELU()
        self.elu2  = nn.ELU() if k>0 else Ignore
        self.bn1   = nn.BatchNorm2d(num_class)
        self.bn2   = nn.BatchNorm2d(num_class)
        self.ln    = nn.LayerNorm([num_class, OUT_HEIGHT, OUT_WIDTH]) if use_ln else nn.BatchNorm2d(num_class)

    def forward(self, inputs,):
        x  = self.elu1(self.bn1(self.conv1(inputs)))
        x  = self.elu2(self.bn2(self.conv2(x)))
        x  = self.ln(x + inputs.clone())
        return x

In [14]:
# Classification layer
class Classifier(nn.Module):
    def __init__(self, layer_size, num_class=100):
        super(Classifier, self).__init__()
        self.gmp = nn.AdaptiveMaxPool2d((1,1))
        self.mlp = nn.Linear(in_features=num_class, out_features=num_class)

    def forward(self, inputs):
        x = self.gmp(inputs).squeeze(-1).squeeze(-1)
        y = self.mlp(x)
        return y

In [15]:
# Triplet Components Feature Encoder
def create_sliding_window_batch(data, window_size=10):
    batch_size, channels, height, width = data.shape

    # Ensure the window size is valid
    if window_size > batch_size:
        raise ValueError("Window size must be less than or equal to the batch size")

    # List to store sliding window batches
    sliding_window_batches = []

    for i in range(window_size-1,batch_size):
        window = data[i-window_size+1:i+1]  # Extract a window of 'window_size' frames
        sliding_window_batches.append(window)

    # Stack all windows to create the final tensor
    sliding_window_tensor = torch.stack(sliding_window_batches, dim=0)  # Shape: (new_batch_size, window_size, channels, height, width)

    return sliding_window_tensor

class Encoder(nn.Module):
    def __init__(self, basename='resnet18', num_tool=6,  num_verb=10, num_target=15, num_triplet=100, hr_output=False):
        super(Encoder, self).__init__()
        depth = 64 if basename == 'resnet18' else 128
        self.basemodel  = BaseModel(hr_output)
        self.tool_wsl   = Tool_WSL(num_tool, depth)
        self.lstm       = LSTMblock(hidden_channels=128)
        self.phase_wsl  = Phase_WSL(num_phase=7, in_channels=128)
        self.cagam      = CAGAM(num_tool, num_verb, num_target)

    def forward(self, x):
        high_x, low_x = self.basemodel(x)
        lstm_input    = create_sliding_window_batch(high_x)
        high_x        = high_x[9:]# Making sure input size is same in all model(batch_size-9,channels,height,width)
        enc_i         = self.tool_wsl(high_x)
        lstm_feature  = self.lstm(lstm_input)
        enc_ivt, phase_logits, verb_phase_cam = self.phase_wsl(lstm_feature)
        enc_v, enc_t  = self.cagam(high_x, enc_i[0], verb_phase_cam)
        return enc_i, enc_v, enc_t, enc_ivt, phase_logits

In [16]:
# MultiHead Attention Decoder
class Decoder(nn.Module):
    def __init__(self, layer_size, d_model, num_heads, num_class=100, use_ln=False):
        super(Decoder, self).__init__()
        self.projection = nn.ModuleList([Projection(num_class=num_class, out_depth=d_model) for i in range(layer_size)])
        self.mhma       = nn.ModuleList([MHMA(num_class=num_class, depth=d_model, num_heads=num_heads, use_ln=use_ln) for i in range(layer_size)])
        self.ffnet      = nn.ModuleList([FFN(k=layer_size-i-1, num_class=num_class, use_ln=use_ln) for i in range(layer_size)])
        self.classifier = Classifier(num_class)

    def forward(self, enc_i, enc_v, enc_t, enc_ivt):
        X = enc_ivt.clone()
        for P, M, F in zip(self.projection, self.mhma, self.ffnet):
            X = P(enc_i[0], enc_v[0], enc_t[0], X)
            X = M(X)
            X = F(X)
        logits = self.classifier(X)
        return logits

In [17]:
# Model Rendezvous
class Rendezvous(nn.Module):
    def __init__(self, basename="resnet18", num_tool=6, num_verb=10, num_target=15, num_triplet=100, layer_size=2, num_heads=4, d_model=128, hr_output=False, use_ln=False):
        super(Rendezvous, self).__init__()
        self.encoder = Encoder(basename, num_tool, num_verb, num_target, num_triplet, hr_output=hr_output)
        self.decoder = Decoder(layer_size, d_model, num_heads, num_triplet, use_ln=use_ln)

    def forward(self, inputs):
        enc_i, enc_v, enc_t, enc_ivt, phase_logits = self.encoder(inputs)
        dec_ivt = self.decoder(enc_i, enc_v, enc_t, enc_ivt)
        return enc_i, enc_v, enc_t, dec_ivt, phase_logits

## Training and Testing

In [None]:
!pip install ivtmetrics

Collecting ivtmetrics
  Downloading ivtmetrics-0.1.5-py3-none-any.whl.metadata (14 kB)
Downloading ivtmetrics-0.1.5-py3-none-any.whl (17 kB)
Installing collected packages: ivtmetrics
Successfully installed ivtmetrics-0.1.5


In [None]:
import os
import sys
import time
import torch
import random
import argparse
import platform
import ivtmetrics
import numpy as np
from torch import nn
from torch.utils.data import DataLoader

In [None]:
# Training loop
def train_video_wise_loop(dataloader, model, optimizer, scheduler, loss_fn_p, loss_fn_i, loss_fn_v, loss_fn_t, loss_fn_ivt, activation, final_eval=False):
    model.train()
    #size = len(dataloader.dataset)
    num_batches = len(dataloader)

    for batch, (img, (y1, y2, y3, y4, y5)) in enumerate(dataloader):
        #img, y1, y2, y3, y4, y5 = img.cuda(), y1.cuda(), y2.cuda(), y3.cuda(), y4.cuda(), y5.cuda()

        if (img.shape[0]<10):
            continue
        y1 = y1[9:]
        y2 = y2[9:]
        y3 = y3[9:]
        y4 = y4[9:]
        y5 = y5[9:]
        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass (LSTM model for video-wise processing)
        tool, verb, target, triplet, phase = model(img)

        cam_i, logit_i  = tool
        cam_v, logit_v  = verb
        cam_t, logit_t  = target
        logit_ivt       = triplet
        logit_phase     = phase

        # Compute loss for each output
        loss_tool = loss_fn_i(logit_i, y1.float())
        loss_verb = loss_fn_v(logit_v, y2.float())
        loss_target = loss_fn_t(logit_t, y3.float())
        loss_triplet = loss_fn_ivt(logit_ivt, y4.float())
        loss_phase = loss_fn_p(logit_phase, y5.float())

        # Combine the losses (you can weight the losses based on your need)
        total_loss = loss_tool + loss_verb + loss_target + loss_triplet + loss_phase

        # Backward pass and optimization
        total_loss.backward()
        optimizer.step()
        scheduler.step()

        # Optionally: log metrics or losses for monitoring
        if ((batch % 10 == 0) or (batch == num_batches-1)):  # Log every 10 batches
            print(f"Batch {batch+1}/{num_batches}, Loss: {total_loss.item()}, Tool loss: {loss_tool.item()}, Verb loss: {loss_verb.item()}, Target loss: {loss_target.item()}, Triplet loss: {loss_triplet.item()}, Phase loss: {loss_phase.item()}")

# Train for each video-wise batch
def train_video_wise(dataloader, model, optimizer, activation, scheduler, loss_fn_p, loss_fn_i, loss_fn_v, loss_fn_t, loss_fn_ivt):
    for video_dataloader in dataloader:
        train_video_wise_loop(video_dataloader, model, optimizer, scheduler, loss_fn_p, loss_fn_i, loss_fn_v, loss_fn_t, loss_fn_ivt, activation)

# Assuming your model, optimizer, and loss function are defined
# model = YourModel().cuda()
# optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# loss_fn = torch.nn.CrossEntropyLoss()  # Example, use appropriate loss for your task
# activation = torch.nn.Softmax(dim=1)  # Adjust based on your model output

# # Start training (video-wise)
# train_video_wise(train_dataloaders, model, optimizer, loss_fn, activation, epochs=10)

In [None]:
def test_loop(dataloader, model, activation, final_eval=False):
    #size = len(dataloader.dataset)
    num_batches = len(dataloader)
    mAP.reset()
    loss_tool = 0
    loss_verb = 0
    loss_target = 0
    loss_triplet = 0
    loss_phase = 0
    total_loss = 0
    if final_eval:
        mAPv.reset()
        mAPt.reset()
        mAPi.reset()
    with torch.no_grad():
        for batch, (img, (y1, y2, y3, y4, y5)) in enumerate(dataloader):
            #img, y1, y2, y3, y4, y5 = img.to(device), y1.(), y2.cuda(), y3.cuda(), y4.cuda(), y5.cuda()
            model.eval()
            if (img.shape[0]<10):
              continue
            y1 = y1[9:]
            y2 = y2[9:]
            y3 = y3[9:]
            y4 = y4[9:]
            y5 = y5[9:]
            tool, verb, target, triplet, phase = model(img)
            cam_i, logit_i = tool
            cam_v, logit_v = verb
            cam_t, logit_t = target
            logit_phase    = phase

            # Compute loss for each output
            curr_loss_tool = loss_fn_i(logit_i, y1.float())
            curr_loss_verb = loss_fn_v(logit_v, y2.float())
            curr_loss_target = loss_fn_t(logit_t, y3.float())
            curr_loss_triplet = loss_fn_ivt(triplet, y4.float())
            curr_loss_phase = loss_fn_p(logit_phase, y5.float())

            loss_tool += curr_loss_tool.item()
            loss_verb += curr_loss_verb.item()
            loss_target += curr_loss_target.item()
            loss_triplet += curr_loss_triplet.item()
            loss_phase += curr_loss_phase.item()

            # Combine the losses (you can weight the losses based on your need)
            total_loss += curr_loss_tool + curr_loss_verb + curr_loss_target + curr_loss_triplet + curr_loss_phase
            if final_eval:
                mAPi.update(y1.float().detach().cpu(), activation(logit_i).detach().cpu()) # Log metrics
                mAPv.update(y2.float().detach().cpu(), activation(logit_v).detach().cpu()) # Log metrics
                mAPt.update(y3.float().detach().cpu(), activation(logit_t).detach().cpu()) # Log metrics
            mAP.update(y4.float().detach().cpu(), activation(triplet).detach().cpu()) # Log metrics
        print(f"Test loss, Loss: {total_loss.item()/num_batches}, Tool loss: {loss_tool.item()/num_batches}, Verb loss: {loss_verb.item()/num_batches}, Target loss: {loss_target.item()/num_batches}, Triplet loss: {loss_triplet.item()/num_batches}, Phase loss: {loss_phase.item()/num_batches}")
    mAP.video_end()
    if final_eval:
        mAPv.video_end()
        mAPt.video_end()
        mAPi.video_end()

In [None]:
def weight_mgt(score, epoch):
    # hyperparameter selection based on validation set
    global benchmark
    if score > benchmark.item():
        ckpt_path = "/content/drive/MyDrive/BH25/try1/best.pt"
        torch.save(model.state_dict(), ckpt_path)
        benchmark = score
        print(f'>>> Saving checkpoint for epoch {epoch+1} at {ckpt_path} ')
        return "increased"
    else:
        return "decreased"

np.seterr(divide='ignore', invalid='ignore')
torch.autograd.set_detect_anomaly(False)
torch.autograd.profiler.profile(False)
torch.autograd.profiler.emit_nvtx(False)

<torch.autograd.profiler.emit_nvtx at 0x7a897ccdfd60>

In [None]:
dataset = CholecT50(dataset_dir="/content/drive/MyDrive/CholecT50_unzipped/CholecT50")

# build dataset
train_dataset, test_dataset = dataset.build()

def get_default_device():
  if torch.cuda.is_available():
    return torch.device('cuda')
  else:
    return torch.device('cpu')

def to_device(data,device):
  if isinstance(data,(list,tuple)):
    return [to_device(x,device) for x in data]
  else:
    return data.to(device,non_blocking=True)

class DeviceDataloader():
  def __init__(self,device,dl):
    self.dl=dl
    self.device=device

  def __iter__(self):
    for batch in self.dl:
      yield to_device(batch,self.device)

  def __len__(self):
    return len(self.dl)

device=get_default_device()

train_dataloaders = []
for video_dataset in train_dataset:
    # Shuffle=False ensures we process frames in order
    video_dataloader = DataLoader(video_dataset, batch_size=128, shuffle=False,
                                  num_workers=2, pin_memory=True, drop_last=False)
    video_dataloader = DeviceDataloader(device, video_dataloader)
    train_dataloaders.append(video_dataloader)

print("Training datasets loaded...")

test_dataloaders = []
for video_dataset in test_dataset:
    test_dataloader = DataLoader(video_dataset, batch_size=128, shuffle=False, num_workers=2, pin_memory=True, drop_last=False)
    test_dataloader = DeviceDataloader(device, test_dataloader)
    test_dataloaders.append(test_dataloader)
print("Test Dataset loaded ...")

Training datasets loaded...
Test Dataset loaded ...


In [None]:
tool_weight     = [0.93487068, 0.94234964, 0.93487068, 1.18448115, 1.02368339, 0.97974447]
verb_weight     = [0.60002400, 0.60002400, 0.60002400, 0.61682467, 0.67082683, 0.80163207, 0.70562823, 2.11208448, 2.69230769, 0.60062402]
target_weight   = [0.49752894, 0.52041527, 0.49752894, 0.51394739, 2.71899565, 1.75577963, 0.58509403, 1.25228034, 0.49752894, 2.42993134, 0.49802647, 0.87266576, 1.36074165, 0.50150917, 0.49802647]

In [None]:
print(torch.__version__)

2.5.1+cu121


In [None]:
# model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Rendezvous().to(device)
model

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 112MB/s]


Rendezvous(
  (encoder): Encoder(
    (basemodel): BaseModel(
      (basemodel): ResNet(
        (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
        (layer1): Sequential(
          (0): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (relu): ReLU(inplace=True)
            (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          )
          (1): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride

In [None]:
from torch.optim.lr_scheduler import OneCycleLR
benchmark   = torch.nn.Parameter(torch.tensor([0.0]), requires_grad=False)
print("Model built ...")
# Loss
activation  = nn.Sigmoid()
loss_fn_i   = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(tool_weight).to(device))
loss_fn_v   = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(verb_weight).to(device))
loss_fn_t   = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(target_weight).to(device))
loss_fn_ivt = nn.BCEWithLogitsLoss()
loss_fn_p   = nn.BCEWithLogitsLoss()

# evaluation metrics
mAP = ivtmetrics.Recognition(100)
mAP.reset_global()
mAPi = ivtmetrics.Recognition(6)
mAPv = ivtmetrics.Recognition(10)
mAPt = ivtmetrics.Recognition(15)
mAPi.reset_global()
mAPv.reset_global()
mAPt.reset_global()
print("Metrics built ...")

opt=torch.optim.Adam(model.parameters(),lr=1e-3)
steps_per_epoch=0
for dl in train_dataloaders:
  steps_per_epoch+=len(dl)
scheduler = OneCycleLR(opt, max_lr=1e-3, epochs=100, steps_per_epoch=steps_per_epoch)

Model built ...
Metrics built ...


In [None]:
# Training loop
for epoch in range(0,100):
        # Train
        train_video_wise(train_dataloaders, model, opt, activation, scheduler, loss_fn_p, loss_fn_i, loss_fn_v, loss_fn_t, loss_fn_ivt)

        if ((epoch+1)%5 == 0):
          torch.save(model.state_dict(), f"/content/drive/MyDrive/BH25/try1/{epoch+1}.pt")
        # val
        if (epoch+1) % 10 == 0:
            mAP.reset_global()
            print("Evaluating @ epoch: ", epoch)
            test_loop(test_dataloader, model, activation, final_eval=False)
            print(mAP.compute_video_AP()['mAP'])
            behaviour = weight_mgt(mAP.compute_video_AP()['mAP'], epoch=epoch)



Batch 1/14, Loss: 6.0100274085998535, Tool loss: 1.0615891218185425, Verb loss: 1.309896469116211, Target loss: 1.4442378282546997, Triplet loss: 0.9419798851013184, Phase loss: 1.252324104309082
Batch 11/14, Loss: 5.128958225250244, Tool loss: 0.9290322065353394, Verb loss: 1.1195158958435059, Target loss: 1.1855696439743042, Triplet loss: 0.7714528441429138, Phase loss: 1.123387336730957
Batch 14/14, Loss: 5.525054931640625, Tool loss: 1.1109355688095093, Verb loss: 1.3133386373519897, Target loss: 1.2483489513397217, Triplet loss: 0.7461399435997009, Phase loss: 1.106291651725769
Batch 1/23, Loss: 5.242277145385742, Tool loss: 1.0459965467453003, Verb loss: 1.200860857963562, Target loss: 1.1893187761306763, Triplet loss: 0.706326961517334, Phase loss: 1.0997737646102905
Batch 11/23, Loss: 4.744808197021484, Tool loss: 0.9863380789756775, Verb loss: 1.1159216165542603, Target loss: 1.0507687330245972, Triplet loss: 0.6580628156661987, Phase loss: 0.9337166547775269
Batch 21/23, Loss

KeyboardInterrupt: 

In [None]:
# Final testing loop
#model.load_state_dict(torch.load(test_ckpt))# No need if testing in same session as training
mAP.reset_global()
for test_dataloader in test_dataloaders:
    test_loop(test_dataloader, model, activation, final_eval=True)
mAP_i = mAPi.compute_video_AP(ignore_null=False)
mAP_v = mAPv.compute_video_AP(ignore_null=False)
mAP_t = mAPt.compute_video_AP(ignore_null=False)
mAP_iv = mAP.compute_video_AP('iv', ignore_null=False)
mAP_it = mAP.compute_video_AP('it', ignore_null=False)
mAP_ivt = mAP.compute_video_AP('ivt', ignore_null=False)
print('-'*50)
print('Test Results\nPer-category AP: ')
print(f'I   : {mAP_i["AP"]}')
print(f'V   : {mAP_v["AP"]}')
print(f'T   : {mAP_t["AP"]}')
print(f'IV  : {mAP_iv["AP"]}')
print(f'IT  : {mAP_it["AP"]}')
print(f'IVT : {mAP_ivt["AP"]}')
print('-'*50)
print(f'Mean AP:  I  |  V  |  T  |  IV  |  IT  |  IVT ')
print(f':::::: : {mAP_i["mAP"]:.4f} | {mAP_v["mAP"]:.4f} | {mAP_t["mAP"]:.4f} | {mAP_iv["mAP"]:.4f} | {mAP_it["mAP"]:.4f} | {mAP_ivt["mAP"]:.4f} ')
print('='*50)


## Creating submisssion json file

In [1]:
import os
import random
import numpy as np
from PIL import Image
import torchvision.transforms as transforms
from torch.utils.data import Dataset, ConcatDataset, DataLoader

In [2]:
class T50(Dataset):
    def __init__(self, img_dir, triplet_file, tool_file, verb_file, target_file, phase_file, transform=None, target_transform=None):
        self.triplet_labels = np.loadtxt(triplet_file, dtype=int, delimiter=',')
        self.tool_labels = np.loadtxt(tool_file, dtype=int, delimiter=',')
        self.verb_labels = np.loadtxt(verb_file, dtype=int, delimiter=',')
        self.target_labels = np.loadtxt(target_file, dtype=int, delimiter=',')
        self.phase_labels = np.loadtxt(phase_file, dtype=int, delimiter=',')
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.triplet_labels)

    def __getitem__(self, index):
        triplet_label = self.triplet_labels[index, 1:]
        tool_label = self.tool_labels[index, 1:]
        verb_label = self.verb_labels[index, 1:]
        target_label = self.target_labels[index, 1:]
        phase_label = self.phase_labels[index, 1:]
        basename = "{}.png".format(str(self.triplet_labels[index, 0]).zfill(6))
        img_path = os.path.join(self.img_dir, basename)
        image    = Image.open(img_path)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            triplet_label = self.target_transform(triplet_label)
        return str(self.triplet_labels[index, 0]).zfill(6), image, (tool_label, verb_label, target_label, triplet_label, phase_label)

In [3]:
class CholecT50():
    def __init__(self,
                dataset_dir):
      self.dataset_dir = dataset_dir
      train_videos = [1, 2, 4, 5, 6, 8, 10, 12, 13, 14]
      test_videos = [92, 96, 103, 110, 111]
      self.train_records = ['VID{}'.format(str(v).zfill(2)) for v in train_videos]
      self.test_records  = ['VID{}'.format(str(v).zfill(2)) for v in test_videos]
      trainform, testform = self.transform()
      self.build_train_dataset(trainform)
      self.build_test_dataset(testform)

    def no_augumentation(self, x):
      return x

    def transform(self):
      normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      op_test   = [transforms.Resize((256, 448)), transforms.ToTensor(), normalize]
      op_train  = [transforms.Resize((256, 448)), transforms.ToTensor(), normalize]
      testform  = transforms.Compose(op_test)
      trainform = transforms.Compose(op_train)
      return trainform, testform

    def add_black_frames_and_zeros(self,video_path, txt_files, img_shape=(256, 448)):
      # Step 1: Add 9 black frames
      black_frame = np.zeros((256, 448, 3), dtype=np.uint8)
      for i in range(9):
          black_frame_img = Image.fromarray(black_frame)
          save_path=os.path.join(video_path, f"{i+1}0000.png")
          black_frame_img.save(save_path)

      # Step 2: Add 9 lines of zeros to each text file
      for txt_file in txt_files:
          with open(txt_file, 'r') as file:
              lines = file.readlines()

          zero_line = [0 for _ in range(len(lines[0].split(","))-1)]  # Assuming all lines have the same number of entries

          for i in range(9):
            output_line = f"{i+1}0000," + ",".join(map(str, zero_line))
            final_line = f"{output_line}\n"
            lines.insert(0, final_line)

          with open(txt_file, 'w') as file:
              file.writelines(lines)

    def build_train_dataset(self, transform):
        iterable_dataset = []
        for video in self.train_records:
            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'data', video),
                        triplet_file = os.path.join(self.dataset_dir, 'triplet', '{}.txt'.format(video)),
                        tool_file = os.path.join(self.dataset_dir, 'instrument', '{}.txt'.format(video)),
                        verb_file = os.path.join(self.dataset_dir, 'verb', '{}.txt'.format(video)),
                        target_file = os.path.join(self.dataset_dir, 'target', '{}.txt'.format(video)),
                        phase_file =  os.path.join(self.dataset_dir, 'phase', '{}.txt'.format(video)),
                        transform=transform)
            iterable_dataset.append(dataset)
        self.train_dataset = iterable_dataset

    def build_test_dataset(self, transform):
        iterable_dataset = []
        for video in self.test_records:
            img_dir = os.path.join(self.dataset_dir, 'data', video)
            print(img_dir)
            triplet_file = os.path.join(self.dataset_dir, 'triplet', '{}.txt'.format(video))
            tool_file = os.path.join(self.dataset_dir, 'instrument', '{}.txt'.format(video))
            verb_file = os.path.join(self.dataset_dir, 'verb', '{}.txt'.format(video))
            target_file = os.path.join(self.dataset_dir, 'target', '{}.txt'.format(video))
            phase_file = os.path.join(self.dataset_dir, 'phase', '{}.txt'.format(video))

            # Add 9 black frames and zeros to text files
            #self.add_black_frames_and_zeros(img_dir, [triplet_file, tool_file, verb_file, target_file, phase_file])

            dataset = T50(img_dir = os.path.join(self.dataset_dir, 'data', video),
                triplet_file = os.path.join(self.dataset_dir, 'triplet', '{}.txt'.format(video)),
                tool_file = os.path.join(self.dataset_dir, 'instrument', '{}.txt'.format(video)),
                verb_file = os.path.join(self.dataset_dir, 'verb', '{}.txt'.format(video)),
                target_file = os.path.join(self.dataset_dir, 'target', '{}.txt'.format(video)),
                phase_file = os.path.join(self.dataset_dir, 'phase', '{}.txt'.format(video)),
                transform=transform)
            iterable_dataset.append(dataset)
        self.test_dataset = iterable_dataset

    def build(self):
        return (self.train_dataset, self.test_dataset)

In [18]:
!pip install ivtmetrics

Collecting ivtmetrics
  Downloading ivtmetrics-0.1.5-py3-none-any.whl.metadata (14 kB)
Downloading ivtmetrics-0.1.5-py3-none-any.whl (17 kB)
Installing collected packages: ivtmetrics
Successfully installed ivtmetrics-0.1.5


In [19]:
import os
import sys
import time
import torch
import random
import argparse
import platform
import ivtmetrics
import numpy as np
from torch import nn
from torch.utils.data import DataLoader
import json
import torch.nn.functional as F

In [42]:
def test_loop(num,dataloader, model, activation, final_eval=False):
    #size = len(dataloader.dataset)
    num_batches = len(dataloader)

    #Dictionary of tools for each triplet(triplet key tool value)

    l = {0:0,1:0,2:0,3:0,4:0,5:0,6:0,7:0,8:0,9:0,10:0,11:0,12:0,13:0,14:0,15:0,16:0,17:0,18:0,19:0,20:0,21:0,22:1,23:1,24:1,25:1,26:1,27:1,28:1,29:1,30:1,31:1,32:1,33:1,34:1,35:1,
    36:1,37:1,38:1,39:1,40:1,41:1,42:1,43:1,44:1,45:1,46:2,47:2,48:2,49:2,50:2,51:2,52:2,53:2,54:2,55:2,56:2,57:2,58:2,59:2,60:2,61:2,62:2,63:2,64:2,65:3,66:3,67:3,68:3,69:3,70:3,
    71:3,72:3,73:3,74:3,75:3,76:3,77:4,78:4,79:4,80:4,81:4,82:5,83:5,84:5,85:5,86:5,87:5,88:5,89:5,90:5,91:5,92:5,93:5,94:0,95:1,96:2,97:3,98:4,99:5}

    json_data = {}
    last_9_imgs=[]
    batch_num=1

    with torch.no_grad():
        for batch, (frame_ids, img, (y1, y2, y3, y4, y5)) in enumerate(dataloader):
            #img, y1, y2, y3, y4, y5 = img.to(device), y1.(), y2.cuda(), y3.cuda(), y4.cuda(), y5.cuda()
            model.eval()

            if(batch_num==1):
              batch_num+=1
              frame_ids = frame_ids[9:]
            else:
              img = torch.cat((last_9_imgs, img), dim=0)

            if (img.shape[0]<10):
              continue
            last_9_imgs = img[-9:]
            tool, verb, target, triplet, phase = model(img)
            cam_i, logit_i = tool
            cam_v, logit_v = verb
            cam_t, logit_t = target
            logit_phase    = phase

            # Convert logits to probabilities

            probabilities = F.softmax(triplet, dim=-1)  # Assuming dec_ivt is the logits for triplets
            #print(probabilities.max())
            #triplets = (probabilities>0.5).float()
            triplets = probabilities.cpu().numpy()
            #print(triplets.shape)
            #print(triplets.max())
            for frame_idx, frame_id in enumerate(frame_ids):
            # Example: Assuming enc_i contains (cam, logits)

              curr_cam_i = cam_i[frame_idx]
              curr_logit_i = logit_i[frame_idx]
              curr_triplet = triplets[frame_idx]
              # Calculate probabilities from logits
              tool_probs = F.softmax(curr_logit_i, dim=-1).cpu().numpy()

              # print(np.where(tool_probs==1))
              # print(cam_i.shape)


              # Structure detection entries
              detection_entries = []
              for trip_id, trip_prob in enumerate(curr_triplet):
                  tool_id = l[trip_id]
                  tool_prob = tool_probs[tool_id]
                  curr_cam = curr_cam_i[tool_id]
                  #print(curr_cam_i.shape)
                  curr_cam = curr_cam.unsqueeze(0)
                  #print(curr_cam_i.shape)
                  probabilities = F.softmax(curr_cam_i, dim=(0))  # Softmax over spatial dimensions
                  cam = curr_cam.squeeze(0)
                  #print(cam.shape)
                  mean_val = cam.mean().item()
                  std_dev = cam.std().item()
                  threshold = mean_val + 2 * std_dev
                  binary_mask = (cam >= threshold).float()#.cpu().numpy()

                  non_zero_indices = torch.nonzero(binary_mask)
                  if non_zero_indices.numel() == 0:
                    x_left = torch.tensor([-1])
                    y_top  = torch.tensor([-1])
                    width  = torch.tensor([-1])
                    height = torch.tensor([-1])
                  else:
                    y_top, x_left = torch.min(non_zero_indices, dim=0).values
                    y_bottom, x_right = torch.max(non_zero_indices, dim=0).values
                    width = x_right-x_left
                    height = y_bottom-y_top

                  bbox_x, bbox_y, bbox_w, bbox_h = x_left.item()/14, y_top.item()/8, width.item()/14, height.item()/8
                  detection_entry = {
                    "triplet": int(trip_id),  # Assuming one triplet per frame, modify as needed
                    "instrument": [int(tool_id), float(tool_prob), float(abs(bbox_x)), float(abs(bbox_y)), float(abs(bbox_w)), float(abs(bbox_h))]
                  }
                  detection_entries.append(detection_entry)
                  #print('Worked')
              # Structure JSON data for each frame
              json_data[str(frame_id)] = {
                  "recognition": curr_triplet.tolist(),  # Probabilities for each triplet
                  "detection": detection_entries
              }

        output_path=f"/content/drive/MyDrive/BH25/try1/output{num}.json"
        # Save to JSON file
        with open(output_path, 'w') as json_file:
            json.dump(json_data, json_file, indent=4)

In [22]:
dataset = CholecT50(dataset_dir="/content/drive/MyDrive/CholecT50_unzipped/CholecT50")

# # build dataset
train_dataset, test_dataset = dataset.build()

def get_default_device():
  if torch.cuda.is_available():
    return torch.device('cuda')
  else:
    return torch.device('cpu')

def to_device(data,device):
  if isinstance(data,(list,tuple)):
    return [to_device(x,device) for x in data]
  elif isinstance(data,str):
    return data
  else:
    return data.to(device,non_blocking=True)

class DeviceDataloader():
  def __init__(self,device,dl):
    self.dl=dl
    self.device=device

  def __iter__(self):
    for batch in self.dl:
      yield to_device(batch,self.device)

  def __len__(self):
    return len(self.dl)

device=get_default_device()

train_dataloaders = []
for video_dataset in train_dataset:
    # Shuffle=False ensures we process frames in order
    video_dataloader = DataLoader(video_dataset, batch_size=128, shuffle=False,
                                  num_workers=2, pin_memory=True, drop_last=False)
    video_dataloader = DeviceDataloader(device, video_dataloader)
    train_dataloaders.append(video_dataloader)

print("Training datasets loaded...")

test_dataloaders = []
for video_dataset in test_dataset:
    test_dataloader = DataLoader(video_dataset, batch_size=128, shuffle=False, num_workers=2, pin_memory=True, drop_last=False)
    test_dataloader = DeviceDataloader(device, test_dataloader)
    test_dataloaders.append(test_dataloader)
print("Test Dataset loaded ...")

/content/drive/MyDrive/CholecT50_unzipped/CholecT50/data/VID92
/content/drive/MyDrive/CholecT50_unzipped/CholecT50/data/VID96
/content/drive/MyDrive/CholecT50_unzipped/CholecT50/data/VID103
/content/drive/MyDrive/CholecT50_unzipped/CholecT50/data/VID110
/content/drive/MyDrive/CholecT50_unzipped/CholecT50/data/VID111
Training datasets loaded...
Test Dataset loaded ...


In [23]:
# model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Rendezvous().to(device)
model

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 75.5MB/s]


Rendezvous(
  (encoder): Encoder(
    (basemodel): BaseModel(
      (basemodel): ResNet(
        (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
        (layer1): Sequential(
          (0): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (relu): ReLU(inplace=True)
            (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          )
          (1): BasicBlock(
            (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride

In [25]:
from torch.optim.lr_scheduler import OneCycleLR
benchmark   = torch.nn.Parameter(torch.tensor([0.0]), requires_grad=False)
print("Model built ...")
# Loss
activation  = nn.Sigmoid()
# loss_fn_i   = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(tool_weight).to(device))
# loss_fn_v   = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(verb_weight).to(device))
# loss_fn_t   = nn.BCEWithLogitsLoss(pos_weight=torch.tensor(target_weight).to(device))
# loss_fn_ivt = nn.BCEWithLogitsLoss()
# loss_fn_p   = nn.BCEWithLogitsLoss()

# evaluation metrics
# mAP = ivtmetrics.Recognition(100)
# mAP.reset_global()
# mAPi = ivtmetrics.Recognition(6)
# mAPv = ivtmetrics.Recognition(10)
# mAPt = ivtmetrics.Recognition(15)
# mAPi.reset_global()
# mAPv.reset_global()
# mAPt.reset_global()
# print("Metrics built ...")

Model built ...


In [43]:
# Final testing loop
model.load_state_dict(torch.load("/content/drive/MyDrive/BH25/try1/50.pt"))# No need if testing in same session as training
#mAP.reset_global()
num=1
for test_dataloader in test_dataloaders:
    test_loop(num,test_dataloader, model, activation, final_eval=True)
    print(f"Video{num} done")
    num+=1
# mAP_i = mAPi.compute_video_AP(ignore_null=False)
# mAP_v = mAPv.compute_video_AP(ignore_null=False)
# mAP_t = mAPt.compute_video_AP(ignore_null=False)
# mAP_iv = mAP.compute_video_AP('iv', ignore_null=False)
# mAP_it = mAP.compute_video_AP('it', ignore_null=False)
# mAP_ivt = mAP.compute_video_AP('ivt', ignore_null=False)
# print('-'*50)
# print('Test Results\nPer-category AP: ')
# print(f'I   : {mAP_i["AP"]}')
# print(f'V   : {mAP_v["AP"]}')
# print(f'T   : {mAP_t["AP"]}')
# print(f'IV  : {mAP_iv["AP"]}')
# print(f'IT  : {mAP_it["AP"]}')
# print(f'IVT : {mAP_ivt["AP"]}')
# print('-'*50)
# print(f'Mean AP:  I  |  V  |  T  |  IV  |  IT  |  IVT ')
# print(f':::::: : {mAP_i["mAP"]:.4f} | {mAP_v["mAP"]:.4f} | {mAP_t["mAP"]:.4f} | {mAP_iv["mAP"]:.4f} | {mAP_it["mAP"]:.4f} | {mAP_ivt["mAP"]:.4f} ')
# print('='*50)

  model.load_state_dict(torch.load("/content/drive/MyDrive/BH25/try1/50.pt"))# No need if testing in same session as training


Video1 done
Video2 done
Video3 done
Video4 done
Video5 done
