Visual pipeline : CLIP_Adapter

1. Install CLIP

In [None]:
%%bash
pip install git+https://github.com/openai/CLIP.git

2. Loading function for dataset loading

In [None]:
import pickle
import random
import torch
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from google.colab import drive
drive.mount('/content/drive')
import os
import h5py
import numpy as np
import pickle
os.chdir('/content/drive/MyDrive/Dataset') #need to change this directory to your specific directory
import matplotlib.pyplot as plt
import torch
import cv2
from PIL import Image
import requests
import clip

device = "cuda" if torch.cuda.is_available() else "cpu"

clip_model, preprocess = clip.load("ViT-B/32", device=device, jit=False)
global classes

classes = ["Drama", "Comedy", "Romance", "Thriller", "Crime", "Action", "Adventure", "Horror", "Documentary", "Mystery", "Sci-Fi", "Music", "Fantasy", "Family", "Biography", "War", "History", "Animation", "Musical", "Western", "Sport", "Short", "Film-Noir"]

def load_small_set():

  # load the dataset from .pkl file
  with open('mid_set.pkl', 'rb') as file:
      small_set = pickle.load(file)
  print('test small_set!!!')
  print(small_set[0][0])
  print(type(small_set[0]))
  print('pass!!!')
  batch_size = 32
  # print(small_set)

  processed_small_set = []
  test_debug = []

  for img, label, index in small_set:
      processed_img = preprocess(img).to(device)
      processed_data = (processed_img, label, index)
      processed_small_set.append(processed_data)
      test_debug.append(index)
      # print('test!!! size of preprocessed image!')
      # print(processed_img.shape)
  small_set = processed_small_set
  # creae train_loader and test_loader
  train_loader = DataLoader(small_set[0:5000], batch_size=batch_size, shuffle=True)
  print(test_debug)
  test_loader = DataLoader(small_set[5000:6000], batch_size=batch_size, shuffle=False)
  return train_loader, test_loader

# train_loader, test_loader = load_small_set() #uncomment this if want to examine in 2.1

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


2.1 (alternative) Examine the loaded data(if want to run, need to uncomment the final line in section2)

this examination program will show some samples in the set

In [None]:
import torch
import matplotlib.pyplot as plt

num_images_to_display = 100
rows, cols = 10, 10
displayed_images = 0
fig, axs = plt.subplots(rows, cols, figsize=(10, 10))

for batch_idx, (images, labels, idex) in enumerate(train_loader):
    batch_size = images.shape[0]
    for i in range(batch_size):
        if displayed_images >= num_images_to_display:
            break
        image = images[i].permute(1, 2, 0)
        ax = axs[displayed_images // cols, displayed_images % cols]
        ax.imshow(image)
        ax.axis('off')
        displayed_images += 1

    if displayed_images >= num_images_to_display:
        break
plt.tight_layout()
plt.show()

3. Main body of the training code

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import clip
from torch.utils.tensorboard import SummaryWriter
import os
import datetime
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ['TORCH_USE_CUDA_DSA'] = '1'

device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, preprocess = clip.load("ViT-B/32", device=device, jit=False)
# preprocess the prompt
text_inputs = torch.cat([clip.tokenize(f"a poster of a {c} movie") for c in classes]).to(device)

# this is the model class for the whole visual pipeline
class MultiModalModel(nn.Module):
    def __init__(self):
        super(MultiModalModel, self).__init__()
        self.adapter = Adapter(512, 4).to(clip_model.dtype)
        self.logit_scale = clip_model.logit_scale
        self.dtype = clip_model.dtype
    def forward(self, img, text_inputs):

        # Calculate CLIP_features
        with torch.no_grad():
            img = img.to(device)
            text_inputs = text_inputs.to(device)
            image_features = clip_model.encode_image(img)
            text_features = clip_model.encode_text(text_inputs)

        x = self.adapter(image_features)
        y = self.adapter(text_features)
        ratio = 0.2
        image_features = ratio * x + (1 - ratio) * image_features
        text_features = ratio * y + (1 - ratio) * text_features

        image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        # print(image_features)
        # print(text_features)
        logit_scale = self.logit_scale.exp()
        logits = (100 * image_features @ text_features.t()).softmax(dim=-1)
        # logits = torch.softmax(logits)
        # print(logits)
        # logits = (100 * image_features @ text_features.T).softmax(dim=-1)

        return logits

# this is just the class for Adapter module in the pipeline
class Adapter(nn.Module):
    def __init__(self, c_in, reduction=4):
        super(Adapter, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(c_in, c_in // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(c_in // reduction, c_in, bias=False),
            nn.ReLU(inplace=True)
        )
        self.fc = self.fc.to(device)

    def forward(self, x):
        x = x.to(device)
        x = self.fc(x)
        return x


# #######################Training function####################################
def train_model(model, train_loader, optimizer, criterion, num_epochs, writer):
    model.train()

    for epoch in range(num_epochs):
        epoch_loss = 0.0
        for img, labels, idex in train_loader:
            optimizer.zero_grad()
            outputs = model(img, text_inputs)
            labels = labels.to(device)
            loss = criterion(outputs, labels.float())
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        avg_epoch_loss = epoch_loss / len(train_loader)
        writer.add_scalar('Training Loss', avg_epoch_loss, epoch+1)

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_epoch_loss:.4f}")
    writer.close()

def main():
    # setups
    image_feature_dim = 512
    text_feature_dim = 512
    num_classes = 23
    # hidden_dim = ...
    num_epochs = 30
    learning_rate = 0.1

    # loading the dataset costs lots of RAM
    train_loader, test_loader = load_small_set()

    model = MultiModalModel()

    model_checkpoint_path = 'CLIP_Adapter_3layers_15epoch_best.pth'  # loading previously-trained weight
    if os.path.exists(model_checkpoint_path):
        model.load_state_dict(torch.load(model_checkpoint_path))
        print(f"Model parameters loaded from '{model_checkpoint_path}'")

    print('Turning off gradients in both the image and the text encoder')
    for name, param in model.named_parameters():
        if 'adapter' not in name:
            param.requires_grad_(False)
######uncomment the whole section below if wants to train the model#############
    # # define cost function and optimization
    # criterion = nn.BCELoss()
    # optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    # #tensorboard launching
    # writer = SummaryWriter()
    # # Training starts
    # train_model(model, train_loader, optimizer, criterion, num_epochs, writer)
    # # get the current date and time
    # current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    # #naming checkpoint file
    # model_checkpoint_path = f'/content/model_checkpoint_{current_time}.pth'

    # # saving training results
    # torch.save(model.state_dict(), model_checkpoint_path)
    # print(f"Model parameters saved to '{model_checkpoint_path}'")
if __name__ == '__main__':
    main()
    %load_ext tensorboard
    %tensorboard --logdir=runs

4. load checkpoint to test F-score.
(the best performance is saved in 'CLIP_Adapter_3layers_15epoch_best.pth')

In [None]:
from sklearn.metrics import f1_score

model_checkpoint_path = 'CLIP_Adapter_3layers_15epoch_best.pth'
model = MultiModalModel()
model.load_state_dict(torch.load(model_checkpoint_path))
model.to(device)

model.eval()
test_predictions = []
test_labels = []
with torch.no_grad():
    for img, labels, idex in test_loader:
        outputs = model(img, text_inputs)
        test_labels.append(labels)
        test_debug =[]
        for i in range(len(img)):
          top_values = []
          top_indices = []
          top_values, top_indices = outputs[i].topk(3)
          selected_indices = []
          # Apply conditions to select final labels
          # Apply conditions to select final labels
          #post-process rule A as mentioned in the report section 3.1
          if top_values[0] >= 50.0:
              selected_indices = [top_indices[0]]
          elif top_values[0] + top_values[1] >= 50.0:
              selected_indices = top_indices[:2]
          else:
              selected_indices = top_indices
          # Print the result
          # print("\nTop predictions:\n")
          predicted_after_onehot = torch.zeros(23)

          selected_indices = selected_indices
          for index in selected_indices:
            predicted_after_onehot[index] = 1
          test_predictions.append(predicted_after_onehot)

test_labels = torch.cat(test_labels, dim=0)
test_predictions = torch.stack(test_predictions, dim=0)

f1 = f1_score(test_labels, test_predictions, average="weighted")
print('weighted F1-score:', f1)
microf1 = f1_score(test_labels, test_predictions, average="micro")
print('micro F1-score:', microf1)
macrof1 = f1_score(test_labels, test_predictions, average="macro")
print('macro F1-score:', macrof1)

weighted F1-score: 0.499638253846981
micro F1-score: 0.5093755689058802
macro F1-score: 0.38535284705176304
