Copyright &copy; CAMMA, ICube, University of Strasbourg. All Rights Reserved.

<div>
<a href="https://cholectriplet2021.grand-challenge.org/">
<img src="https://raw.githubusercontent.com/DpkApt/evis_at/master/pictures/header.png" align="left"/>
</a>
</div>

## <h1><center>Getting Started</center></h1>


# Introduction

In this notebook, we provide sample code to help familiarize yourself with the challenge, the dataset and the metrics. These are minimal examples to help illustrate a simple deep learning pipeline applied on a small subset of the Action Triplet dataset, **CholecT50**.

# Data Loading and Visualization

In [None]:
# Import necessary libraries for this module

from PIL import Image

import csv
import matplotlib.pyplot as plt
import numpy as np
import os


print("Libraries successfully imported!")

Here, we use a small subset of the CholecT50 dataset available at this link: https://seafile.unistra.fr/f/ba1427a82ecc4ce18566/?dl=1. If you are running this notebook on Colab, you can run the cell below to download and unzip the dataset to the current directory

In [None]:
# Ignore this cell if you have already downloaded and extracted the dataset

!wget -O CholecT50_sample.zip https://seafile.unistra.fr/f/ba1427a82ecc4ce18566/?dl=1
!unzip CholecT50_sample.zip

print("Dataset successfully extracted!")

In [None]:
# Change the dataset_path mentioned below appropriately if you have exracted the data to a different directory 

dataset_path = './CholecT50_sample/'

data_path = os.path.join(dataset_path, 'data')
triplet_path = os.path.join(dataset_path, 'triplet')
dict_path = os.path.join(dataset_path, 'dict')
video_names = os.listdir(data_path)                                   

print("Dataset paths successfully defined!")

In [None]:
# Create dictionary mapping triplet ids to readable label

with open(os.path.join(dict_path, 'triplet.txt'), 'r') as f:
  triplet_info = f.readlines()
  triplet_dict = {}
  for l in triplet_info:
    triplet_id, triplet_label = l.split(':')
    triplet_dict[int(triplet_id)] = triplet_label.rstrip()

print('Random triplet id and its human readable label\n')
random_triplet_id = np.random.choice(list(triplet_dict.keys()))
print('Triplet id: ', random_triplet_id, '\nReadable label: ', triplet_dict[random_triplet_id])

In [None]:
def generator(data_path, triplet_path, video_names, batch_size, shuffle_videos=False):
  """ Defines a simple generator that returns sequential batches of input images and  their 
      corresponding triplet labels, video names, and frame ids.
        Args:
            data_path:     Path to directory containing a folder for each video
            triplet_path:  Path to folder containing a txt file for each video
                           listing the frame id, and binary label for all of the 100 considered 
                           triplet classes
            video_names:   Names of the videos that will be retruned by this generator. These names
                           should correpond to a folder in data_path and a txt file in triplet path
            batch_size:    Batch size of outputs yielded by the generator
            shuffle_videos:To perform a shuffling of videos (Note: frames will be returned sequentially!)  
        Returns:
            image batch     : Batch of images
            triplet batch   : Batch of triplet labels ([N] vectors)
            video_name_batch: Batch of video name strings
            frame_id_batch  : Batch of integer frame ids
    """

  if shuffle_videos:
    video_names = np.random.shuffle(video_names)

  image_batch, triplet_batch, video_name_batch, frame_id_batch = [], [], [], []

  for video_name in video_names:
    with open(os.path.join(triplet_path, video_name + '.txt'), mode='r') as infile:
        reader = csv.reader(infile)

        for line in reader:
          line = np.array(line, np.int64)
          frame_id, triplet_label = line[0], line[1:]
          image_path = os.path.join(data_path, video_name, "%06d.png" %frame_id)
          image = np.array(Image.open(image_path), np.float32) / 255.0

          image_batch.append(image)
          triplet_batch.append(triplet_label)
          video_name_batch.append(video_name)
          frame_id_batch.append(int(frame_id))

          if len(frame_id_batch) == batch_size:
            yield image_batch, triplet_batch, video_name_batch, frame_id_batch
            image_batch, triplet_batch, video_name_batch, frame_id_batch = [], [], [], []
            

In [None]:
batch_size = 8
gen = generator(data_path, triplet_path, video_names, batch_size)

for images, triplet_labels, video_names, frame_ids in gen:
  for batch in range(batch_size):
    print('\nVisualizing image...\n')
    print('Video name: ', video_names[batch], ' Frame_id', frame_ids[batch])
    plt.imshow(images[batch])
    plt.show()
    print('\nEncoding showing which of the 100 considered action triplets are represented in the image\n')
    print(triplet_labels[batch])
    print('\nReadable labels\n')
    for triplet in np.where(triplet_labels[batch])[0]:
      print(triplet_dict[triplet])
    
  break

#  Building and Running Models 

We build and perform a simple forward pass of an image through a few layer convolutional network to predict the probability of each of the considered triplets being represented in the input image.

Note: Please run the cells in the previous module Data Loading and Visualization before running this module

In [None]:
# Import necessary libraries for this module. You can skip ahead if you prefer a PyTorch based example.

import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np

print("Libraries successfully imported!")

Defining a simple neural network using tf.keras. You can skip ahead if you prefer to use torch.nn

In [None]:
# Defining the neural network architecture
model = tf.keras.Sequential()
model.add(tf.keras.layers.Conv2D(
    filters=16, kernel_size=3, strides=2, activation="relu", input_shape=(480, 854, 3))
)                 
model.add(tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides = 2, activation="relu")) 
model.add(tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides = 2, activation="relu")) 
model.add(tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides = 2, activation="relu")) 
model.add(tf.keras.layers.Conv2D(filters=32, kernel_size=3, strides = 2, activation="relu")) 
model.add(tf.keras.layers.Flatten())                                     
model.add(tf.keras.layers.Dense(units=4096, activation="relu"))                 
model.add(tf.keras.layers.Dense(units=2048, activation="relu"))            
model.add(tf.keras.layers.Dense(units=100, activation="sigmoid"))    

print("Neural network architecture successfully defined!")

In [None]:
model.build([1, 480, 854, 3])
model.summary()

Performing a simple forward pass

In [None]:
input_4d = np.expand_dims(images[0], axis=0)
print('Performing a simple forward pass on our untrained network for a test image')
plt.imshow(images[0])
plt.show()
print('\nPrediction\n')
print(model.predict(input_4d)[0])
print('\nLabel\n')
print(triplet_labels[0])
print('\nReadable label\n')
for triplet_id in np.where(triplet_labels[0])[0]:
  print(triplet_dict[triplet_id])

[**OPTIONAL**] Using PyTorch to make a prediction

In [None]:
# Import necessary libraries for this module.

import torch
import numpy as np  
from torch import nn
from torch.nn import Module  


In [None]:
class MyModel(Module):
    # define model elements
    def __init__(self, h, w):
        super(MyModel, self).__init__()
        # input to first hidden layer
        self.conv1 = nn.Conv2d(3, 32, (3,3))
        self.conv2 = nn.Conv2d(32, 64, (3,3))
        self.conv3 = nn.Conv2d(64, 128, (3,3))
        self.conv4 = nn.Conv2d(128, 256, (3,3))
        self.pool1 = nn.MaxPool2d((3,3), stride=(2,2))
        self.pool2 = nn.MaxPool2d((2,2), stride=(2,2))
        self.pool3 = nn.MaxPool2d((2,2), stride=(2,2))
        self.h   = int(h/8 - 4)
        self.w   = int(w/8 - 4)
        self.mlp = nn.Linear(self.h*self.w*256, 100)        
        self.act1 = nn.ReLU()
        self.act2 = nn.ReLU()
        self.act3 = nn.ReLU()        
        self.act4 = nn.Sigmoid()
    # forward propagate input
    def forward(self, X):
        # input to first hidden layer
        X = self.conv1(X)
        X = self.act1(X)
        X = self.pool1(X)
        # second hidden layer
        X = self.conv2(X)
        X = self.act2(X)
        X = self.pool2(X)
        # second hidden layer
        X = self.conv3(X)
        X = self.act3(X)
        X = self.pool3(X)
        # second hidden layer
        X = self.conv4(X)
        # flatten
        X = X.view(-1, self.h*self.w*256)
        # output layer
        X = self.mlp(X)
        X = self.act4(X)
        return X

In [None]:
input_4d = np.expand_dims(images[0], axis=0)
# Converting to Channel first. NHWC --> NCHW
input_4d = np.transpose(input_4d, [0, 3, 1, 2])
input_4d = torch.from_numpy(input_4d)
print('Performing a simple forward pass on our untrained network for a test image')
plt.imshow(images[0])
plt.show()
print('\nPrediction\n')
model  = MyModel(480, 854)
print(model(input_4d)[0])
print('\nLabel\n')
print(triplet_labels[0])
print('\nReadable label\n')
for triplet_id in np.where(triplet_labels[0])[0]:
  print(triplet_dict[triplet_id])

#  Metrics and Evaluation 


In [None]:
# Import necessary libraries for this module

import numpy as np
from sklearn.metrics import average_precision_score


In [None]:
def _compute_AP(gt_labels, pd_probs, valid=None):
    """ Compute the average precision (AP) of each of the 100 considered triplets.
        Args:
            gt_labels: 1D (batch of) vector[N] of integer values 0's or 1's for the groundtruth labels.
            pd_probs:  1D (batch of) vector[N] of float values [0,1] for the predicted labels.
        Returns:
            results:   1D vector[N] of AP for each class 
    """
    gt_instances  = np.sum(gt_labels, axis=0)
    pd_instances  = np.sum(pd_probs, axis=0)
    computed_ap   = average_precision_score(gt_labels, pd_probs, average=None)
    actual_ap     = []
    num_classes   = np.shape(gt_labels)[-1]
    for k in range(num_classes):
        if ((gt_instances[k] != 0) or (pd_instances[k] != 0)) and not np.isnan(computed_ap[k]):
            actual_ap.append(computed_ap[k])
        else:
            actual_ap.append("n/a")
    return actual_ap


In [None]:
def _average_by_videos(results):
    """ Compute the average AP of each triplet class across all the videos
        and mean AP of the model on the triplet predictions.
        Args:
            results:   1D (batch of) vector of AP for each class. One member of the batch corresponds
                       to one video
        Returns:
            AP:   1D vector[N] of AP for each class averaged by videos
    """
    n = results.shape[-1]
    AP = []
    for j in range(n):
        x = results[:,j]
        x = np.mean([float(a) for a in x if (str(a)!='n/a') ])
        if np.isnan(x):
            AP.append("n/a")
        else:          
            AP.append(x)
    mAP = np.mean( [i for i in AP if i !='n/a'])
    return np.array(AP), mAP


In [None]:
# example usage. Here we use randomly generated ground truth and predicted values

half_0s = np.zeros(shape=[20,100], dtype=np.int64)
half_1s = np.ones(shape=[20,100], dtype=np.int64)

vid230_gt = np.concatenate((half_0s, half_1s), axis=0)
vid231_gt = np.concatenate((half_1s, half_0s), axis=0)

vid230_pd = np.random.random((40,100))
vid231_pd = np.random.random((40,100))

non_null  = list(range(0,93)) # for ignoring the null triplets
ap_vid230 = _compute_AP(gt_labels=vid230_gt, pd_probs=vid230_pd, valid=non_null)
ap_vid231 = _compute_AP(gt_labels=vid231_gt, pd_probs=vid231_pd, valid=non_null)

ap_vid  = np.stack([ap_vid230, ap_vid231,], axis=0)
AP,mAP  = _average_by_videos(results=ap_vid)

print(AP, "\nmAP = ",mAP)

#  Saving Results


A minimal working example to save your model results using a docker image is provided here: https://seafile.unistra.fr/f/a495966e56e84bf0a834/?dl=1

# Supplementary

Optionally, if you want to incorporate labels for instruments, verbs and targets into your modelling. You can use the following code to decompose a triplet id into its corresponding instrument, verb and target ids, respectively, and vice versa. 

In [None]:
# Import necessary libraries for this module

import numpy as np
import os

In [None]:
# Ignore this cell if you have already downloaded and extracted the dataset

!wget -O CholecT50_sample.zip https://seafile.unistra.fr/f/ba1427a82ecc4ce18566/?dl=1
!unzip CholecT50_sample.zip

print("Dataset successfully extracted!")

In [None]:
num_instrument = 6
num_verb     = 10
num_target   = 15
num_triplet  = 100
map_dict_url = './CholecT50_sample/dict/maps.txt'
maps_dict    = np.genfromtxt(map_dict_url, dtype=int, comments='#', delimiter=',', skip_header=0)

In [None]:
def map_selector(component='iv'):
    choices = {'ivt':0, 'i':1, 'v':2, 't':3, 'iv':4, 'it':5, 'vt':6, } 
    return  choices.get(component, 0) 

def decompose(inputs, component='i'):
    """ Extract the component labels from the triplets. E.g.: from a triplet ID vector[100], get the vector [6] for the used instruments (i), or the vector [15] for the target acted upon. 
        Args:
            inputs: a 1D vector of dimension (n), where n = number of triplet classes;
                    with values int(0 or 1) for target labels and float[0, 1] for predicted labels.
            component: a string for the component to extract; 
                    (e.g.: i for instrument, v for verb, t for target, iv for instrument-verb pair, it for instrument-target pair and vt (unused) for verb-target pair)
        Returns:
            output: int or float sparse encoding 1D vector of dimension (n), where n = number of component's classes.
    """
    key    = map_selector(component)
    index  = sorted(np.unique(maps_dict[:,key]))
    output = []
    for idx in index:
        same_class = [i for i,x in enumerate(maps_dict[:,key]) if x==idx]
        y = np.max(inputs[same_class])
        output.append( y )        
    return output


In [None]:
# Create dictionary mapping triplet ids, instrument ids, verb ids and target ids to readable label

dict_path = './CholecT50_sample/dict/'

with open(os.path.join(dict_path, 'triplet.txt'), 'r') as f:
  triplet_info = f.readlines()
  triplet_dict = {}
  for l in triplet_info:
    triplet_id, triplet_label = l.split(':')
    triplet_dict[int(triplet_id)] = triplet_label.rstrip()

with open(os.path.join(dict_path, 'instrument.txt'), 'r') as f:
  instrument_info = f.readlines()
  instrument_dict = {}
  for l in instrument_info:
    instrument_id, instrument_label = l.split(':')
    instrument_dict[int(instrument_id)] = instrument_label.rstrip()

with open(os.path.join(dict_path, 'verb.txt'), 'r') as f:
  verb_info = f.readlines()
  verb_dict = {}
  for l in verb_info:
    verb_id, verb_label = l.split(':')
    verb_dict[int(verb_id)] = verb_label.rstrip()

with open(os.path.join(dict_path, 'target.txt'), 'r') as f:
  target_info = f.readlines()
  target_dict = {}
  for l in target_info:
    target_id, target_label = l.split(':')
    target_dict[int(target_id)] = target_label.rstrip()

In [None]:
print('Random triplet id and its human readable label\n')
random_triplet_id = np.random.choice(list(triplet_dict.keys()))
print('Triplet id: ', random_triplet_id, '\nReadable label: ', triplet_dict[random_triplet_id])

one_hot_triplet = np.zeros(100)
one_hot_triplet[random_triplet_id] = 1

instrument_id = np.where(decompose(one_hot_triplet, 'i'))[0][0]
verb_id = np.where(decompose(one_hot_triplet, 'v'))[0][0]
target_id = np.where(decompose(one_hot_triplet, 't'))[0][0]

print('Instrument id: ', instrument_id, '\nReadable label: ', instrument_dict[instrument_id])
print('Verb id: ', verb_id, '\nReadable label: ', verb_dict[verb_id])
print('Target id: ', target_id, '\nReadable label: ', target_dict[target_id])

Additionally, if you want to learn the triplet as a 3d matrix of interaction as done in [Nwoye C.I. et.al, Recognition of Instrument-Tissue Interactions in Endoscopic Videos via Action Triplets, MICCAI 2020](https://arxiv.org/abs/2007.05405), we provide code below to map the a vector of triplet ids to its corresponding 3D matrix and vice versa.

In [None]:

def project_1d_to_3d(inputs):
    """ Convert triplets labels from 1D vector to 3D matrix of interaction.
        Args:
            inputs: a 1D vector of dimension (n), where n = number of triplet classes;
            with values int(0 or 1) for target labels and float[0, 1] for predicted labels.
        Returns:
            output: int or float sparse encoding 3D matrix of dimension (nI, nV, nT);
            where nI = number of instrument classes,  nV = number of verb classes,  nT = number of target classes
    """
    d3    = np.zeros([num_instrument, num_verb, num_target], dtype=np.float32)      
    for idx, val in enumerate(inputs):
        d3[maps_dict[idx,1], maps_dict[idx,2], maps_dict[idx,3]] = val
    return d3
    
def project_3d_to_1d(self, inputs):
    """ Convert triplets labels from 3D vector to 1D matrix of interaction.
        Args:
            inputs: a 3D matrix of dimension (nI, nV, nT);
            where nI = number of instrument classes,  nV = number of verb classes,  nT = number of target classes
        Returns:
            output: int or float sparse encoding 1D vector of dimension (n), where n = number of triplet classes;
            with values int(0 or 1) for target labels and float[0, 1] for predicted labels.
    """
    d1   = np.zeros([num_triplet], dtype=np.float32)      
    for idx in range(num_triplet):
        d1[idx] = inputs[maps_dict[idx,1], maps_dict[idx,2], maps_dict[idx,3]]
    return d1