In [1]:
!mkdir dataset
!gdown 1xijq32XfEm6FPhUb7RsZYWHc2UuwVkiq
!mv refcocog.tar.gz ./dataset/
!ls dataset

Downloading...
From: https://drive.google.com/uc?id=1xijq32XfEm6FPhUb7RsZYWHc2UuwVkiq
To: /content/refcocog.tar.gz
100% 13.5G/13.5G [05:59<00:00, 37.4MB/s]
refcocog.tar.gz


In [4]:
!tar -xf dataset/refcocog.tar.gz -C dataset
!ls dataset

refcocog  refcocog.tar.gz


In [7]:
import torch
import torchvision
import torch.nn.functional as F
import torchvision.transforms as T
from torch.utils.data import Dataset
from torch.utils.tensorboard import SummaryWriter
import os
from posixpath import split
import json
import tarfile
import io
import pickle
import sys
from PIL import Image
from collections import OrderedDict
from pathlib import Path

import numpy as np
import skimage
import IPython.display
import matplotlib.pyplot as plt
from PIL import Image
from torchvision.utils import draw_bounding_boxes

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

device = "cuda" if torch.cuda.is_available() else "cpu"

In [6]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ftfy
  Downloading ftfy-6.1.1-py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.1/53.1 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ftfy
Successfully installed ftfy-6.1.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-ng7x6gi7
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-ng7x6gi7
  Resolved https://github.com/openai/CLIP.git to commit a9b1bf5920416aaeaec965c25dd9e8f98c864f16
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: clip
  Building wheel for clip (setup.py) ... [?25l[?25hdone
  Created wheel for clip: filename=clip-1.0-py3-n

In [8]:
from clip import clip

model, preprocess = clip.load("RN50")
model = model.to(device).eval()

100%|███████████████████████████████████████| 244M/244M [00:04<00:00, 62.5MiB/s]


In [None]:
def encode_data(images_fp: list[str], texts: list[str]):
  # preprocess the images to transform from filenames to images to tensors
  images = [preprocess(Image.open(image)) for image in images_fp]

  # preprocess the texts to transform from text to tensors
  images = torch.tensor(np.stack(images)).to(device)
  text_tokens = clip.tokenize(["This is " + desc for desc in texts]).to(device)

  # encode the inputs
  with torch.no_grad():
    images_z = model.encode_image(images).float().to(device)
    texts_z = model.encode_text(text_tokens).float().to(device)
  
  return images_z, texts_z

In [None]:
def cosine_similarity(images_z: torch.Tensor, texts_z: torch.Tensor):
  # normalise the image and the text
  images_z /= images_z.norm(dim=-1, keepdim=True)
  texts_z /= texts_z.norm(dim=-1, keepdim=True)

  # evaluate the cosine similarity between the sets of features
  similarity = (texts_z @ images_z.T)

  return similarity.to(device)

In [4]:
class RefCOCOgDataset(Dataset):

  """A simple dataset representing the numbers from 0 to size-1"""

  def __init__(self, root, transform=None, split=True, data_dir='dataset/refcocog'):
    super(RefCOCOgDataset, self).__init__()
    self.root = root = os.path.expanduser(root)
    self.transform = transform
    self.split = split

    self.data = {}
    f = open(f'{data_dir}/annotations/refs(umd).p', 'rb')
    self.data['refs'] = pickle.load(f)
    instances_file = os.path.join(f'{data_dir}/annotations/instances.json')
    instances = json.load(open(instances_file, 'r'))
    self.data['images'] = instances['images']
    self.data['annotations'] = instances['annotations']
    print(self.data['refs'][0])
    print(self.data['annotations'][0])
    print(self.data['images'][0])
 
    #return map(list, zip(*[self.preProcess_datasets(self.data['refs'], self.data['annotations'], self.data['images'])]))

  def __getitem__(self, idx):
    """Get an item given its id.

    Args:
      idx: the integral index of the element to retrieve

    Returns:
      element at index idx
    """
    result = torch.tensor([idx], dtype=torch.float32)

    # if a transformation is available, we apply it
    if self.transform is not None:
      result = self.transform(result)
    
    # create train and test splits (80/20)
    num_samples = len(result)
    training_samples = int(num_samples * 0.8 + 1)
    test_samples = num_samples - training_samples
    if self.split:
      training_data, test_data = torch.utils.data.random_split(result, [training_samples, test_samples])

    return training_data, test_data

  def __len__(self):
    """Get the length of the dataset.

    Returns:
      number of elements that compose the dataset
    """


  def preProcess_datasets(self, refs, anns, imgs):
    images = []
    texts = []       

    for filename in imgs:
      name = filename.stem
      if name in refs:
        for i, v in refs:
          image = preprocess(Image.open(r'refcocog/images/'+v['file_name'])).unsqueeze(0).to(device)
          for j, w in anns:
            if(v['image_id'] == w['image_id']):
              image = draw_bounding_boxes(image=image, boxes=w['bbox'], labels=v['sentences']['sent']''', fill=''').to(device)
          image = torch.tensor(np.stack(image)).to(device)
          text_token = clip.tokenize(v['sentences']['sent']).to(device)
          image.shape, text_token.shape
        images.append(image)
        texts.append(text_token)
      else:
        continue


    return images, texts 

In [None]:
def get_data(root, batch_size=64, transform=True, test_batch_size=256):
  
    # build a chain of transformations
    transformations_sequence = [
      # random changes in pixel colors
      T.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
      # resize each PIL image to 256 x 256
      T.Resize((256, 256)),                   
      # the former transformations accept and return PIL Image objects, now convert to Tensor
      T.ToTensor(),
      # apply normalization
      T.Normalize(mean=[0.4913, 0.4821, 0.4465], std=[0.2470, 0.2434, 0.2615])
  ]
    if not transform:
      # convert the PIL images to Tensors
      composed_transformation = torchvision.transforms.Compose([torchvision.transforms.ToTensor()])  
    else:
      # prepare data transformations and then combine them sequentially
      composed_transformation = T.Compose(transformations_sequence)

    # load data
    full_training_data, test_data = RefCOCOgDataset(root=root, transform=composed_transformation, split=True)

    # create train and validation splits
    num_samples = len(full_training_data)
    training_samples = int(num_samples * 0.5 + 1)
    validation_samples = num_samples - training_samples

    training_data, validation_data = torch.utils.data.random_split(full_training_data, [training_samples, validation_samples])

    # initialize dataloaders
    train_loader = torch.utils.data.DataLoader(training_data, batch_size, shuffle=True, num_workers=4)
    val_loader = torch.utils.data.DataLoader(validation_data, test_batch_size, shuffle=False, num_workers=4)
    test_loader = torch.utils.data.DataLoader(test_data, test_batch_size, shuffle=False, num_workers=4)
    
    return train_loader, val_loader, test_loader

In [None]:
def cosine_similarity(images_z: torch.Tensor, texts_z: torch.Tensor):
    # normalise the image and the text
    images_z /= images_z.norm(dim=-1, keepdim=True)
    texts_z /= texts_z.norm(dim=-1, keepdim=True)

    # evaluate the cosine similarity between the sets of features
    similarity = (texts_z @ images_z.T)
    return similarity.to(device)

In [None]:
def training_step(net, data_loader, optimizer, cost_function, device='cuda'):
  samples = 0.0
  cumulative_loss = 0.0
  cumulative_accuracy = 0.0

  # set the network to training mode
  net.train()

  # iterate over the training set
  for batch_idx, (inputs, targets) in enumerate(data_loader):
    # load data into GPU
    inputs = inputs.to(device)
    targets = targets.to(device)
      
    # forward pass
    outputs = net(inputs)

    # loss computation
    loss = cost_function(outputs, targets)

    # backward pass
    loss.backward()
    
    # parameters update
    optimizer.step()
    
    # gradients reset
    optimizer.zero_grad()

    # fetch prediction and loss value
    samples += inputs.shape[0]
    cumulative_loss += loss.item()
    _, predicted = outputs.max(dim=1) # max() returns (maximum_value, index_of_maximum_value)

    # compute training accuracy
    cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss / samples, cumulative_accuracy / samples * 100

def test_step(net, data_loader, cost_function, device='cuda'):
  samples = 0.0
  cumulative_loss = 0.0
  cumulative_accuracy = 0.0

  # set the network to evaluation mode
  net.eval() 

  # disable gradient computation (we are only testing, we do not want our model to be modified in this step!)
  with torch.no_grad():
    # iterate over the test set
    for batch_idx, (inputs, targets) in enumerate(data_loader):
      # load data into GPU
      inputs = inputs.to(device)
      targets = targets.to(device)
        
      # forward pass
      outputs = net(inputs)

      # loss computation
      loss = cost_function(outputs, targets)

      # fetch prediction and loss value
      samples += inputs.shape[0]
      cumulative_loss += loss.item() # Note: the .item() is needed to extract scalars from tensors
      _, predicted = outputs.max(1)

      # compute accuracy
      cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_loss / samples, cumulative_accuracy / samples * 100

In [None]:
def get_cost_function():
  cost_function = torch.nn.CrossEntropyLoss()
  return cost_function

In [None]:
def get_optimizer(model, lr, wd, momentum):
  optimizer = torch.optim.SGD([
      {'params': model.classifier.parameters(), 'lr': lr}
  ], lr=lr / 10, weight_decay=wd, momentum=momentum)
  
  return optimizer

In [None]:
"""
Applies Batch Normalization over a 1D input (or 2D tensor)

Shape:
  Input: (N, C)
  Output: (N, C)

Input Parameters:
  in_features: number of features of the input activations
  track_running_stats: whether to keep track of running mean and std. (default: True)
  affine: whether to scale and shift the normalized activations. (default: True)
  momentum: the momentum value for the moving average. (default: 0.9)

Usage:
  >>> # with learable parameters
  >>> bn = BatchNorm1d(4)
  >>> # without learable parameters
  >>> bn = BatchNorm1d(4, affine=False)
  >>> input = torch.rand(10, 4)
  >>> out = bn(input)
"""

class BatchNorm1d(torch.nn.Module):
  def __init__(self, in_features, track_running_stats=True, affine=True, momentum=0.9):
    super().__init__()
    
    self.in_features = in_features
    self.track_running_stats = track_running_stats
    self.affine = affine
    self.momentum = momentum
    
    if self.affine:
      self.gamma = torch.nn.Parameter(torch.ones(self.in_features, 1))
      self.beta = torch.nn.Parameter(torch.zeros(self.in_features, 1))
    
    if self.track_running_stats:
      # register_buffer registers a tensor as a buffer that will be saved as part of the model
      # but which does not require to be trained, differently from nn.Parameter
      self.register_buffer('running_mean', torch.zeros(self.in_features, 1))
      self.register_buffer('running_std', torch.ones(self.in_features, 1))
  
  def forward(self, x):
    # transpose (N, C) to (C, N)
    x = x.transpose(0, 1).contiguous().view(x.shape[1], -1)
    
    # calculate batch mean
    mean = x.mean(dim=1).view(-1, 1)
    
    # calculate batch std
    std = x.std(dim=1).view(-1, 1)
    
    # during training keep running statistics (moving average of mean and std)
    if self.training and self.track_running_stats:
      # no computational graph is necessary to be built for this computation
      with torch.no_grad():
        self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mean
        self.running_std = self.momentum * self.running_std + (1 - self.momentum) * std
    
    # during inference time
    if not self.training and self.track_running_stats:
      mean = self.running_mean
      std = self.running_std
    
    # normalize the input activations
    x = (x - mean) / std
    
    # scale and shift the normalized activations
    if self.affine:
      x = x * self.gamma + self.beta
    
    return x.transpose(0, 1)

In [None]:
"""
Applies Batch Normalization over a 2D or 3D input (4D tensor)

Shape:
  Input: (N, C, H, W)
  Output: (N, C, H, W)

Input Parameters:
  in_features: number of features of the input activations
  track_running_stats: whether to keep track of running mean and std. (default: True)
  affine: whether to scale and shift the normalized activations. (default: True)
  momentum: the momentum value for the moving average. (default: 0.9)

Usage:
  >>> # with learable parameters
  >>> bn = BatchNorm2d(4)
  >>> # without learable parameters
  >>> bn = BatchNorm2d(4, affine=False)
  >>> input = torch.rand(10, 4, 5, 5)
  >>> out = bn(input)
"""

class BatchNorm2d(torch.nn.Module):
  def __init__(self, in_features, track_running_stats=True, affine=True, momentum=0.9):
    super().__init__()
    
    self.in_features = in_features
    self.track_running_stats = track_running_stats
    self.affine = affine
    self.momentum = momentum
    
    if self.affine:
      self.gamma = torch.nn.Parameter(torch.ones(self.in_features, 1))
      self.beta = torch.nn.Parameter(torch.zeros(self.in_features, 1))
    
    if self.track_running_stats:
      # register_buffer registers a tensor as a buffer that will be saved as part of the model
      # but which does not require to be trained, differently from nn.Parameter
      self.register_buffer('running_mean', torch.zeros(self.in_features, 1))
      self.register_buffer('running_std', torch.ones(self.in_features, 1))
  
  def forward(self, x):
    # transpose (N, C, H, W) to (C, N, H, W)
    x = x.transpose(0, 1)
    
    # store the shape
    c, bs, h, w = x.shape
    
    # collapse all dimensions except the 'channel' dimension
    x = x.contiguous().view(c, -1)
    
    # calculate batch mean
    mean = x.mean(dim=1).view(-1, 1)
    
    # calculate batch std
    std = x.std(dim=1).view(-1, 1)
    
    # keep running statistics (moving average of mean and std)
    if self.training and self.track_running_stats:
      with torch.no_grad():
        self.running_mean = self.momentum * self.running_mean + (1 - self.momentum) * mean
        self.running_std = self.momentum * self.running_std + (1 - self.momentum) * std
    
    # during inference time
    if not self.training and self.track_running_stats:
      mean = self.running_mean
      std = self.running_std
    
    # normalize the input activations
    x = (x - mean) / std
    
    # scale and shift the normalized activations
    if self.affine:
      x = x * self.gamma + self.beta
    
    return x.view(c, bs, h, w).transpose(0, 1)


In [None]:
class CustomCLIP(torch.nn.Module):
  def __init__(self, num_classes: int = 10):
    super().__init__()
    model, _ = clip.load("RN50")

    # take the visual encoder of CLIP
    # we also convert it to be 32 bit (by default CLIP is 16)
    self.encoder = model.visual.float()

    # add a linear layer
    self.classifier = torch.nn.Linear(1024, num_classes)
  
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.encoder(x)
    x = self.classifier(x)

    return x

In [None]:
def test_step_zero_shot_clip(net, data_loader, texts_z, device='cuda'):
  samples = 0.0
  cumulative_accuracy = 0.0

  # set the network to evaluation mode
  net.eval()

  # disable gradient computation (we are only testing, we do not want our model to be modified in this step!)
  with torch.no_grad():
    # iterate over the test set
    for batch_idx, (inputs, targets) in enumerate(data_loader):
      # load data into GPU
      inputs = inputs.to(device)
      targets = targets.to(device)
        
      # forward pass
      # these two lines are different from the "traditional" ones
      images_z = model.encode_image(inputs).float()
      outputs = (100 * images_z @ texts_z.T).softmax(dim=-1)

      # fetch prediction and loss value
      samples += inputs.shape[0]
      _, predicted = outputs.max(1)

      # compute accuracy
      cumulative_accuracy += predicted.eq(targets).sum().item()

  return cumulative_accuracy / samples * 100

In [None]:
class BottleneckCLIP(torch.nn.Module):
  def __init__(self, bias=False):
    super().__init__()
    model, _ = clip.load("RN50")
    in_features = 1024
    out_features = 1024

    # take the visual encoder of CLIP
    # we also convert it to be 32 bit (by default CLIP is 16)
    self.encoder = model.visual.float()

    # add a bottleneck
    self.bottleneck = torch.nn.Sequential([
      torch.nn.Linear(in_features, in_features // 2, bias=bias),
      torch.nn.ReLU(inplace=True),
      torch.nn.Linear(in_features // 2, in_features // 2, bias=bias),
      torch.nn.ReLU(inplace=True),
      torch.nn.Linear(in_features // 2, out_features, bias=bias),
    ])

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.encoder(x)
    x = self.bottleneck(x)

    return x

In [None]:
# tensorboard logging utilities
def log_values(writer, step, loss, accuracy, prefix):
  writer.add_scalar(f"{prefix}/loss", loss, step)
  writer.add_scalar(f"{prefix}/accuracy", accuracy, step)

In [None]:
def main(batch_size=128, 
         learning_rate=0.001, 
         weight_decay=0.000001, 
         momentum=0.9, 
         epochs=50, 
         num_classes=65, 
         root='/content/dataset/refcocog/'):
  
  writer = SummaryWriter(log_dir="runs/exp1")

  # instantiates dataloaders
  train_loader, val_loader, test_loader = get_data(root=RefCOCOgDataset.data['refs'], batch_size=batch_size, transform=True, test_batch_size=None)

  # pre-train model on zero-shot transfer learning
  images, texts = map(list, zip(*[RefCOCOgDataset.preProcess_datasets(RefCOCOgDataset.data['refs'], RefCOCOgDataset.data['annotations'], RefCOCOgDataset.data['images'])]))
  images_z, texts_z = encode_data(images, texts)
  test_accuracy = test_step_zero_shot_clip(model, test_loader, texts_z)

  # evaluate accuracy on zero-shot learning
  print(cosine_similarity(images_z, texts_z))
  print("Test accuracy {:.2f}".format(test_accuracy))

  # instantiate the network and move it to the chosen device (GPU)
  modified_model = CustomCLIP(num_classes=num_classes).to(device)

  # instantiate the optimizer
  optimizer = get_optimizer(modified_model, learning_rate, weight_decay, momentum)
  
  # define the cost function
  cost_function = get_cost_function()

  # computes evaluation results before training
  print('Before training:')
  train_loss, train_accuracy = test_step(modified_model, train_loader, cost_function)
  val_loss, val_accuracy = test_step(modified_model, val_loader, cost_function)
  test_loss, test_accuracy = test_step(modified_model, test_loader, cost_function)

  # log to TensorBoard
  log_values(writer, -1, train_loss, train_accuracy, "train")
  log_values(writer, -1, val_loss, val_accuracy, "validation")
  log_values(writer, -1, test_loss, test_accuracy, "test")

  print('\tTraining loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
  print('\tValidation loss {:.5f}, Validation accuracy {:.2f}'.format(val_loss, val_accuracy))
  print('\tTest loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
  print('-----------------------------------------------------')

  # for each epoch, train the network and then compute evaluation results
  for e in range(epochs):
    
    train_loss, train_accuracy = training_step(modified_model, train_loader, optimizer, cost_function)
    val_loss, val_accuracy = test_step(modified_model, val_loader, cost_function)

    # logs to TensorBoard
    log_values(writer, e, val_loss, val_accuracy, "Validation")

    print('Epoch: {:d}'.format(e+1))
    print('\tTraining loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
    print('\tValidation loss {:.5f}, Validation accuracy {:.2f}'.format(val_loss, val_accuracy))
    print('-----------------------------------------------------')

    # compute final evaluation results
    print('After training:')
    train_loss, train_accuracy = test_step(modified_model, train_loader, cost_function)
    val_loss, val_accuracy = test_step(modified_model, val_loader, cost_function)
    test_loss, test_accuracy = test_step(modified_model, test_loader, cost_function)

    # log to TensorBoard
    log_values(writer, epochs, train_loss, train_accuracy, "train")
    log_values(writer, epochs, val_loss, val_accuracy, "validation")
    log_values(writer, epochs, test_loss, test_accuracy, "test")

    print('\tTraining loss {:.5f}, Training accuracy {:.2f}'.format(train_loss, train_accuracy))
    print('\tValidation loss {:.5f}, Validation accuracy {:.2f}'.format(val_loss, val_accuracy))
    print('\tTest loss {:.5f}, Test accuracy {:.2f}'.format(test_loss, test_accuracy))
    print('-----------------------------------------------------')

    # closes the logger
    writer.close()

tensor([0.])


TypeError: ignored

In [None]:
main()