<a href="https://colab.research.google.com/github/Pengwei-Yang/Deep-Learning/blob/main/Multimodal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Downloading pretrained model from huggingface

In [None]:
pip install timm

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting timm
  Downloading timm-0.9.2-py3-none-any.whl (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m30.1 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub (from timm)
  Downloading huggingface_hub-0.14.1-py3-none-any.whl (224 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.5/224.5 kB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting safetensors (from timm)
  Downloading safetensors-0.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m58.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: safetensors, huggingface-hub, timm
Successfully installed huggingface-hub-0.14.1 safetensors-0.3.1 timm-0.9.2


In [None]:
pip install transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting transformers
  Downloading transformers-4.29.1-py3-none-any.whl (7.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.1/7.1 MB[0m [31m58.0 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1 (from transformers)
  Downloading tokenizers-0.13.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.8/7.8 MB[0m [31m78.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tokenizers, transformers
Successfully installed tokenizers-0.13.3 transformers-4.29.1


# Using libiaries

In [None]:
import os
import re
import csv
import timm
import string
import pandas as pd
import numpy as np
import statistics
import itertools
from PIL import Image
import time
from io import StringIO
from skimage import io
import matplotlib.pyplot as plt
import torch
from torch import nn
import torchvision
from torchvision import models
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.transforms import ToTensor, Lambda, Resize, Compose, ToPILImage, Normalize, RandomCrop, RandomHorizontalFlip, RandomVerticalFlip
import matplotlib.pyplot as plt
import csv
from sklearn.metrics import f1_score, confusion_matrix, precision_score, recall_score

# Drive setup

In [None]:
#Set up google drive directory
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Note: Change the DIR below to your custom path ↓

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Using {device} device')

Using cuda device


# Dataset

In [None]:
class CustomDataset(Dataset): # inherits from PyTorch's Dataset class.
    def __init__(self, csv_file, image_dir, transform=None, target_transform=None, has_labels = True):
        self.image_dir = image_dir
        self.transform = transform
        self.target_transform = target_transform
        self.has_labels = has_labels

        # Transform the files to a dataframe
        with open(csv_file) as file:
            lines = [re.sub(r'([^,])"(\s*[^\n])', r'\1/"\2', line) for line in file]
            self.dataframe = pd.read_csv(StringIO(''.join(lines)), escapechar="/")

    def __len__(self):
        return self.dataframe.shape[0]

    def __getitem__(self, idx): # when calling a instantiation function, like len(), return this value
        '''
        Input:
        idx: current image index, is the numebr of image

        Output:
        sample: A tuple, the item (e.g., an image and its label) at a given index with its caption
        '''
        if torch.is_tensor(idx): # If index is a tensor, transform it to a list
            idx = idx.tolist()
        # the following statement give the path of the current image
        img_path = os.path.join(self.image_dir, self.dataframe.iloc[idx, self.dataframe.columns.get_loc('ImageID')])
        # read the image
        img = io.imread(img_path)
        # read the according img_id, and the caption
        img_id = self.dataframe.iloc[idx, self.dataframe.columns.get_loc('ImageID')]
        caption = self.dataframe.iloc[idx, self.dataframe.columns.get_loc('Caption')]
        # for the training data
        if self.has_labels:
          labels = self.dataframe.iloc[idx, self.dataframe.columns.get_loc('Labels')]
          labels = labels.split(' ') # split multi-labels
          labels = [int(x) for x in labels]  # transform the multilabels to a list

          if self.target_transform:
              labels = self.target_transform(labels)
          if self.transform:
              img = self.transform(img)

          sample = (img, labels, img_id, caption)

        else:
          if self.transform:
              img = self.transform(img)
          sample = (img, img_id, caption)

        return sample

# Bert Embedding

In [None]:
from transformers import BertModel, BertTokenizer
import torch

# Load pre-trained model and tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')

# Specify the device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Move the model to the device
model = model.to(device)

def encode_captions(captions):
    model.eval() # Put the model in evaluation mode

    # Initialize an empty list to hold the BERT embeddings for all captions
    embeddings = []

    # Iterate over the captions
    for caption in captions:
        # Tokenize the caption and return tensors
        inputs = tokenizer(caption, return_tensors='pt', padding='max_length', truncation=True, max_length=15)

        # Move the inputs to the device
        inputs = {name: tensor.to(device) for name, tensor in inputs.items()}

        # Run the caption through the model
        with torch.no_grad():
            outputs = model(**inputs)

        # The BERT model returns a tuple where the first element is the output from the model's last layer
        # We take the mean of these output features across the sequence length dimension to get a single vector per caption
        embeddings.append(outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy())

    # Convert the list of embeddings into a tensor
    embeddings_tensor = torch.tensor(embeddings)

    return embeddings_tensor


Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/28.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

Downloading model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
def encode_outputs(output, threshold):
  encoded_outputs = []
  for i in range(len(output)):
    encoded_output = np.zeros(19).tolist()
    for j in range(len(output[i][0])):
      encoded_output[j] = 1 if output[i][0][j].item() >= threshold else 0
    encoded_outputs.append(encoded_output)
  return encoded_outputs

def decode_labels(labels, threshold):
  decoded_labels = []
  for i in range(len(labels)):
    decoded_label = []
    for j in range(len(labels[i][0])):
      if labels[i][0][j].item() >= threshold:
        decoded_label.append(j + 1)
    decoded_labels.append(decoded_label)
  return decoded_labels

# Pretrained Deit-small

In [None]:
class CustomDeiT(nn.Module):
    def __init__(self, base_model, num_classes, caption_dim, is_caption = True):
        super(CustomDeiT, self).__init__()
        self.base_model = base_model
        self.caption_fc = nn.Linear(caption_dim, num_classes)
        self.num_classes = num_classes
        self.is_caption = is_caption


    def forward(self, x, caption=None):
        x = self.base_model(x)

        if caption is not None:
            caption_out = self.caption_fc(caption.type(torch.float))
            if self.is_caption == True:
                x = x + caption_out
                return x
            else:
                return x

base_model = timm.create_model('deit_small_patch16_224', pretrained=True, num_classes=19)
caption_dim = 768
# Ablation experiments here, with caption and without caption.
'''
Set is_caption=True means consider caption, and is_caption=False means do not consider caption as the training data
'''
chosen_model = CustomDeiT(base_model, num_classes=19, caption_dim=caption_dim, is_caption=True)
chosen_model = chosen_model.to(device)

Downloading model.safetensors:   0%|          | 0.00/88.2M [00:00<?, ?B/s]

# Dataloader

In [None]:
SEED = 42
bs = 64
NUM_LABELS = 19
TRAIN_VAL_PROP = 0.9
RESIZE_SIZE = (224, 224)
DIR = '/content/drive/MyDrive/COMP5329_Deep_Learning_Shared/Assignment_2/COMP5329S1A2Dataset'

transforms = Compose([
    ToTensor(),
    Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225]), # Use the mean and std from ImageNet
    Resize(RESIZE_SIZE),
    RandomHorizontalFlip(),
    RandomVerticalFlip(),
])

main_dataset = CustomDataset(csv_file = os.path.join(DIR, "train.csv"),
                                 image_dir = os.path.join(DIR, "data"),
                                 transform = transforms,
                                 target_transform = Lambda(lambda y: torch.zeros(NUM_LABELS, dtype=torch.uint8).scatter_(dim=0, index=torch.sub(torch.tensor(y), 1), value=1)),
                                 has_labels = True
                                 )

train_dataset, val_dataset = random_split(main_dataset,
                                          [int(round(TRAIN_VAL_PROP * len(main_dataset))), int(round((1 - TRAIN_VAL_PROP) * len(main_dataset)))],
                                          generator=torch.Generator().manual_seed(SEED)) # Setting seed to ensure consistency

test_dataset = CustomDataset(csv_file = os.path.join(DIR, "test.csv"),
                                 image_dir = os.path.join(DIR, "data"),
                                transform = transforms,
                                 has_labels = False)

main_dataloader = DataLoader(main_dataset, batch_size = bs, shuffle=True)
train_dataloader = DataLoader(train_dataset, batch_size=bs, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=1, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Defined training function

In [None]:
def train_loop(dataloader, val_dataloader, model, loss_fn, optimizer, test_threshold=None, with_captions=False):
    train_batch_losses = []
    val_batch_losses = []
    val_f1_scores = []

    size = len(dataloader.dataset)
    # Set the model to training mode
    model.train()

    for batch, (X, y, _, captions) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        if with_captions:
            # Perform encoding for the captions
            word_encoding_array = encode_captions(captions)
            # Move the word_encoding_array to the device
            word_encoding_array = word_encoding_array.to(device)
            # Compute prediction and loss
            pred = model(X, word_encoding_array)
        else:
            pred = model(X)

        if type(model) == torchvision.models.inception.Inception3:
          loss = loss_fn(pred.logits, y.type(torch.float))
        else:
          loss = loss_fn(pred, y.type(torch.float))

        # Backpropagation
        best_f1 = 0
        best_threshold = None
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


        if batch % EVAL_EVERY == 0:
            train_batch_losses.append(loss.item())
            train_loss, current = loss.item(), batch * len(X)
            print(f'Current observation is {current}')

            # setting test_loss to dummy value
            test_loss = 0
            test_f1 = 0
            if test_threshold is not None:
                sig = nn.Sigmoid()
                pred_list = []
                true_list = []
                # Switch to eval mode for evaluating test set
                model.eval()
                with torch.no_grad():
                    for X, y, _, captions in val_dataloader:
                        X, y = X.to(device), y.to(device)
                        captions = captions.to(device)
                        true_list.append(y.tolist()[0])

                        if with_captions:
                            word_encoding_array = encode_captions(captions)
                            pred = model.forward(X, word_encoding_array)
                        else:
                            pred = model.forward(X)

                        sig_pred = sig(pred.cuda())
                        pred_list.append(sig_pred)
                        test_loss = loss_fn(pred, y.type(torch.float))


                encoded_val_outputs = encode_outputs(pred_list)
                test_f1 = f1_score(y_true = true_list, y_pred = encoded_val_outputs, average = "weighted", zero_division = 0)

                val_batch_losses.append(test_loss.item())
                val_f1_scores.append(test_f1)

                # Switch back to train mode to resume training
                model.train()

            if test_threshold is None:
              print(f"train loss: {train_loss:>7f}  [{current:>5d}/{size:>5d}]")
            else:
              print(f"train loss: {train_loss:>7f}  [{current:>5d}/{size:>5d}], validation loss: {test_loss:>7f}, validation mean f1: {test_f1:>7f}")


    return train_batch_losses, val_batch_losses, val_f1_scores, best_f1

# Defined test function

In [None]:
def output_to_submission(test_dataloader, model, threshold, with_captions=False):
    sig = torch.nn.Sigmoid()

    predictions = []
    text_predictions = []
    img_ids = test_dataloader.dataset.dataframe['ImageID']

    headings = ['ImageID', 'Labels']
    all_labels = [headings]

    model.eval()
    with torch.no_grad():
      for X, _, captions in test_dataloader:
        X = X.to(device)


        if with_captions:
            word_encoding_array = encode_captions(captions)
            word_encoding_array = word_encoding_array.to(device)
            output = model.forward(X, word_encoding_array).cuda()
        else:
            output = model.forward(X).cuda()

        output = sig(output)
        predictions.append(output)

    decoded_predictions = decode_labels(predictions, threshold)
    for i in range(len(decoded_predictions)):
      single_text = [img_ids[i], " ".join(map(str, decoded_predictions[i]))]
      text_predictions.append(" ".join(map(str, decoded_predictions[i])))

      all_labels.append(single_text)

    text_output = all_labels
    dataframe_output = pd.DataFrame({"ImageID": test_dataloader.dataset.dataframe['ImageID'], "Labels": text_predictions})

    return text_output, dataframe_output

# Training loop

## Note: This training step may take around 3 to 4 hours for one epoch

In [None]:
LR = 0.001
NUM_EPOCHS = 5
EVAL_EVERY = 32

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = model.to(device)

loss_fn = nn.MultiLabelSoftMarginLoss()
optimizer = torch.optim.Adam(chosen_model.parameters(), lr=LR)

chosen_model_train_loss = []

t0 = time.time()
best_threshold = 0.4
best_f1 = 0
for t in range(NUM_EPOCHS):
    print(f"*************************************\nEpoch {t+1}")
    train_loss, val_loss, val_f1, current_best_f1 = train_loop(main_dataloader, val_dataloader, chosen_model, loss_fn, optimizer, with_captions=True)
    chosen_model_train_loss += train_loss
t1 = time.time()
print(f"Model Trained - training time: {int((t1 - t0) // 60)}:{int((t1 - t0) % 60)}")

# Save our fine-tune model

In [None]:
# Save the entire model
torch.save(chosen_model, "/content/drive/MyDrive/COMP5329_Deep_Learning_Shared/Assignment_2/COMP5329S1A2Dataset/finetunemodel.pt")

# Load our fine-tune model

In [None]:
# Specify the file path to load the saved model from
file_path = '/content/drive/MyDrive/COMP5329_Deep_Learning_Shared/Assignment_2/COMP5329S1A2Dataset/finetunemodel.pt'

# Check that the file exists at the specified file path
if not os.path.isfile(file_path):
    raise FileNotFoundError(f"Could not find file at {file_path}")

# Load the saved model from the specified file path
try:
    finetunemodel = torch.load(file_path)
except Exception as e:
    print(f"Error loading saved model: {e}")

# Prediction for test dataset, and export the submission file

## Note: This step may take around 1 hour to output csv file

In [None]:
submission = output_to_submission(test_dataloader, loaded_model, 0.4, with_captions=True)

# with open('submission_file.txt', 'w') as file:
#     file.writelines(', '.join(prediction) + '\n' for prediction in submission[0])

with open('submission_file.csv', 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows(submission[0])