<a href="https://colab.research.google.com/github/Samin765/DD2430_Project/blob/main/CLIP_prompt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Constants
Change these to fit your needs

In [60]:
# Are you developing locally or on google colab?
COLAB = False

# path kaggle will download to
HM_DATA_PATH = "/content/drive/MyDrive/dd2430/data/" if COLAB else "./data/"

# path tourch.save and .load will use
PTH_SAVE_PATH = "/content/drive/MyDrive/dd2430/pth/" if COLAB else "./pth/"

# False if you have already downloaded once
DOWNLOAD_FROM_KAGGLE = False

# False if you have already created and saved a .pth file to PTH_SAVE_PATH
CREATE_NEW_DATASET = False 

# train, test, val set size. Should sum to 1
SET_SIZES = {
    "train": 0.8,
    "test": 0.1,
    "val": 0.1,
}

# samples per class in uniform dataset
N_SAMPLES = 500

# Imports

In [61]:
import os
import random

from tqdm import tqdm
if COLAB:
    from google.colab import files, drive
import gdown

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
from torch.utils.data import DataLoader

import transformers

In [None]:
device = 'cpu'
if torch.cuda.is_available():
    device = 'cuda'
if torch.backends.mps.is_available(): # For apple silicon 
    device = 'mps'

print("Using device: ", device)

#Dataset

In [63]:
from datasets import HMDataset2, UniformHMDataset

In [None]:
n_samples = N_SAMPLES
file_to_load = f"HM_data_{n_samples}.pth"

assert os.path.exists(PTH_SAVE_PATH), "Folder with pth files does not exist"

file_path = f'{PTH_SAVE_PATH}{file_to_load}'
assert os.path.exists(file_path), f'File {file_path} does not exist'

loaded_data = torch.load(file_path)

image_emb = loaded_data['image_embedding']
labels = loaded_data['class_text']
images = loaded_data['images']

dataset = UniformHMDataset(image_emb, labels , images)

## Create new dataset
This will create a new dataset and save it as a .pth to google drive.

In [65]:
if CREATE_NEW_DATASET:
    dataset = HMDataset2(
        articles_csv = HM_DATA_PATH + 'articles.csv',
        image_dir = HM_DATA_PATH + 'images',
        main_class = 'garment_group_name',
        model = transformers.CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device),
        processor = transformers.CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    )

    # data per class
    n_samples = N_SAMPLES

    # you can also set all to n_samples then set the ones you want to 0
    for exclude_subclass in ['Unknown', 'Special Offers', 'some other']:
        dataset.counts[exclude_subclass]=n_samples

    # Create uniform dataset
    image_emb, labels, images = dataset.get_n_of_each(n_samples)

    data_to_save = {
        'image_embedding': image_emb,
        'class_text': labels,
        'images': images,
    }

    os.makedirs(PTH_SAVE_PATH, exist_ok=True)
    torch.save(data_to_save, f'{PTH_SAVE_PATH}HM_data_{n_samples}.pth')

# Split data into train, test, and val set
Use `dataset_train`, `dataset_test`, and `dataset_val`.

In [66]:
# dividing the data in equal parts to the three sets
combined = sorted(zip(labels, image_emb, images), key=lambda x: x[0])
labels, image_emb, images = zip(*combined)

train_labels, train_image_emb, train_images = [], [], []
test_labels, test_image_emb, test_images = [], [], []
val_labels, val_image_emb, val_images = [], [], []

for i in range(0, len(combined) - 1, n_samples):
    labels_sub = labels[i : i + n_samples]
    image_emb_sub = image_emb[i : i + n_samples]
    images_sub = images[i : i + n_samples]

    s = lambda t: int(float(len(labels_sub)) * SET_SIZES[t])

    train_labels.extend(labels_sub[:s("train")])
    train_image_emb.extend(image_emb_sub[:s("train")])
    train_images.extend(images_sub[:s("train")])

    test_labels.extend(labels_sub[s("train"):s("train") + s("test")])
    test_image_emb.extend(image_emb_sub[s("train"):s("train") + s("test")])
    test_images.extend(images_sub[s("train"):s("train") + s("test")])

    val_labels.extend(labels_sub[s("train") + s("test"):])
    val_image_emb.extend(image_emb_sub[s("train") + s("test"):])
    val_images.extend(images_sub[s("train") + s("test"):])

# shuffle the data in each set
def shuffle_set(labels, image_emb, images):
    combined = list(zip(labels, image_emb, images))
    random.shuffle(combined)
    return zip(*combined)

train_labels, train_image_emb, train_images = shuffle_set(train_labels, train_image_emb, train_images)
test_labels, test_image_emb, test_images = shuffle_set(test_labels, test_image_emb, test_images)
val_labels, val_image_emb, val_images = shuffle_set(val_labels, val_image_emb, val_images)

# create the datasets
dataset_train = UniformHMDataset(train_image_emb, train_labels, train_images)
dataset_test = UniformHMDataset(test_image_emb, test_labels, test_images)
dataset_val = UniformHMDataset(val_image_emb, val_labels, val_images)

#Coding

# Evaluate

In [None]:
model = transformers.CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
processor = transformers.CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
processor.feature_extractor.do_rescale = False # make sure image values: False=> [0-1] and True=> [0,255]

In [68]:
batch_size = 128
dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=True)
dataloader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=False)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

In [69]:
# functions for clip text and image embeddings, forward pass etc
# remember to import this again if you change something
import model_functions
# varius function that doesn't fit into model_functions or datasets
# for example displaying images 
import utils

**Baseline model 

In [None]:
show_image = True
all_predictions_baseline = []
all_labels = []
with torch.no_grad():
    for batch_nr, (image_embeds, labels, images) in enumerate(tqdm(dataloader_test)):
        text = [' '+i for i in dataset.classes] # for prediction
        text_embeds = model_functions.get_text_emb(model, processor, text)
        logits_per_image, loss = model_functions.apply_clip(text_embeds, image_embeds, model)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()
        if show_image and batch_nr % 40 == 0:
            images = utils.return_normal(images, processor, 4, True)

        predicted_class_base = logits_per_image.argmax(dim=-1)

        all_predictions_baseline.append(predicted_class_base)
        for lab in labels:
            all_labels.append(dataset.class_to_id[lab])

all_predictions_baseline = torch.cat(all_predictions_baseline).cpu()

correct_base = all_predictions_baseline == torch.tensor(all_labels).cpu()
print(f'\n Accuracy baseline {100*correct_base.sum()/correct_base.shape[0]} %')

# Finetune 

## Fully connected layer to image part

Add a fully connected layer to the end of image model 

In [71]:
extra_image_layer = nn.Linear(512, 512).to(device)

In [72]:
optimizer = torch.optim.Adam(extra_image_layer.parameters(), lr=1e-3)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
# train the last layer
show_image = True
model.train()
loss_list = []
epochs = 20
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
for epoch in tqdm(range(epochs)):
    running_loss = 0.0
    for batch_nr, (image_embeds, labels, images) in enumerate(dataloader_train):
        optimizer.zero_grad()
        images.to(model.device)
        text = [''+i for i in labels] # for training, must use 1-1 map
        text_embeds = model_functions.get_text_emb(model, processor, text)
        # text_embeds = extra_text_layer(text_embeds)
        image_embeds = extra_image_layer(image_embeds)
        logits_per_image, loss = model_functions.apply_clip(text_embeds, image_embeds, model, train=True)
        loss.backward()
        optimizer.step()
        #print(soft_prompts.grad) to see that back prop works, is none otherwise
        running_loss +=loss.item()
    loss_list.append(running_loss)

In [None]:
plt.figure(figsize=(10, 6))
plt.plot(list(range(1, epochs+1)), loss_list, label='Training Loss')
# Adding labels and title
plt.title('Training Loss Over Datapoints')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)
plt.show()

Evaluate fully connected image

In [None]:
show_image = True
all_predictions_image_finetune = []
all_labels = []
with torch.no_grad():
    for batch_nr, (image_embeds, labels, images) in enumerate(tqdm(dataloader_test)):

        text = [' '+i for i in dataset.classes] # for prediction
        text_embeds = model_functions.get_text_emb(model, processor, text)
        image_embeds = extra_image_layer(image_embeds)
        logits_per_image, loss = model_functions.apply_clip(text_embeds, image_embeds, model)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

        if show_image and batch_nr % 40 == 0:
            images = utils.return_normal(images, processor, 4, True)

        predicted_class_base = logits_per_image.argmax(dim=-1)

        all_predictions_image_finetune.append(predicted_class_base)
        for lab in labels:
            all_labels.append(dataset.class_to_id[lab])

all_predictions_image_finetune = torch.cat(all_predictions_image_finetune).cpu()

correct_finetuned_image = all_predictions_image_finetune == torch.tensor(all_labels).cpu()
print(f'\n Accuracy image_finetune {100*correct_finetuned_image.sum()/correct_finetuned_image.shape[0]} %')

## Fully connected to text model

In [76]:
extra_text_layer = nn.Linear(512, 512).to(device)

In [77]:
optimizer = torch.optim.Adam(extra_text_layer.parameters(), lr=1e-3)
criterion = torch.nn.CrossEntropyLoss()

Train last text layer

In [None]:
# train the last layer
show_image = True
model.train()
loss_list = []
epochs = 20
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
for epoch in tqdm(range(epochs)):
    running_loss = 0.0
    for batch_nr, (image_embeds, labels, images) in enumerate(dataloader_train):
        optimizer.zero_grad()
        images.to(model.device)
        text = [''+i for i in labels] # for training, must use 1-1 map
        text_embeds = model_functions.get_text_emb(model, processor, text)
        text_embeds = extra_text_layer(text_embeds)
        logits_per_image, loss = model_functions.apply_clip(text_embeds, image_embeds, model, train=True)
        loss.backward()
        optimizer.step()
        #print(soft_prompts.grad) to see that back prop works, is none otherwise
        running_loss +=loss.item()
    loss_list.append(running_loss)

Evaluate extra text layer

In [None]:
show_image = True
all_predictions_text_finetune = []
all_labels = []
with torch.no_grad():
    for batch_nr, (image_embeds, labels, images) in enumerate(tqdm(dataloader_test)):

        text = [' '+i for i in dataset.classes] # for prediction
        text_embeds = model_functions.get_text_emb(model, processor, text)
        text_embeds = extra_text_layer(text_embeds)
        logits_per_image, loss = model_functions.apply_clip(text_embeds, image_embeds, model)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

        if show_image and batch_nr % 40 == 0:
            images = utils.return_normal(images, processor, 4, True)

        predicted_class_base = logits_per_image.argmax(dim=-1)

        all_predictions_text_finetune.append(predicted_class_base)
        for lab in labels:
            all_labels.append(dataset.class_to_id[lab])

all_predictions_text_finetune = torch.cat(all_predictions_text_finetune).cpu()

correct_finetuned_text = all_predictions_text_finetune == torch.tensor(all_labels).cpu()
print(f'\n Accuracy text_finetune {100*correct_finetuned_text.sum()/correct_finetuned_text.shape[0]} %')

Evaluate both, trained separate

In [None]:
show_image = True
all_predictions_both_finetune = []
all_labels = []
with torch.no_grad():
    for batch_nr, (image_embeds, labels, images) in enumerate(tqdm(dataloader_test)):

        text = [' '+i for i in dataset.classes] # for prediction
        both_embeds = model_functions.get_text_emb(model, processor, text)
        text_embeds = extra_text_layer(text_embeds)
        image_embeds = extra_image_layer(image_embeds)
        logits_per_image, loss = model_functions.apply_clip(both_embeds, image_embeds, model)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

        if show_image and batch_nr % 40 == 0:
            images = utils.return_normal(images, processor, 4, True)

        predicted_class_base = logits_per_image.argmax(dim=-1)

        all_predictions_both_finetune.append(predicted_class_base)
        for lab in labels:
            all_labels.append(dataset.class_to_id[lab])

all_predictions_both_finetune = torch.cat(all_predictions_both_finetune).cpu()

correct_finetuned_both = all_predictions_both_finetune == torch.tensor(all_labels).cpu()
print(f'\n Accuracy both_finetune separate {100*correct_finetuned_both.sum()/correct_finetuned_both.shape[0]} %')

## Fully connected to both models

In [81]:
extra_image_layer2 = nn.Linear(512, 512).to(device)
extra_text_layer2 = nn.Linear(512, 512).to(device)

In [82]:
optimizer = torch.optim.Adam(list(extra_image_layer2.parameters()) + list(extra_text_layer2.parameters()), lr=1e-3)
criterion = torch.nn.CrossEntropyLoss()

In [None]:
# train the last layer
show_image = True
model.train()
loss_list = []
epochs = 20
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
for epoch in tqdm(range(epochs)):
    running_loss = 0.0
    for batch_nr, (image_embeds, labels, images) in enumerate(dataloader_train):
        optimizer.zero_grad()
        images.to(model.device)
        text = [''+i for i in labels] # for training, must use 1-1 map
        text_embeds = model_functions.get_text_emb(model, processor, text)
        text_embeds = extra_text_layer2(text_embeds)
        image_embeds = extra_image_layer2(image_embeds)
        logits_per_image, loss = model_functions.apply_clip(text_embeds, image_embeds, model, train=True)
        loss.backward()
        optimizer.step()
        #print(soft_prompts.grad) to see that back prop works, is none otherwise
        running_loss +=loss.item()
    loss_list.append(running_loss)

In [None]:
show_image = True
all_predictions_both_fintune_same = []
all_labels = []
with torch.no_grad():
    for batch_nr, (image_embeds, labels, images) in enumerate(tqdm(dataloader_test)):

        text = [' '+i for i in dataset.classes] # for prediction
        both_embeds = model_functions.get_both_emb(model, processor, text)
        both_embeds = extra_text_layer2(both_embeds)
        image_embeds = extra_image_layer2(image_embeds)
        logits_per_image, loss = model_functions.apply_clip(both_embeds, image_embeds, model)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()

        if show_image and batch_nr % 40 == 0:
            images = utils.return_normal(images, processor, 4, True)

        predicted_class_base = logits_per_image.argmax(dim=-1)

        all_predictions_both_fintune_same.append(predicted_class_base)
        for lab in labels:
            all_labels.append(dataset.class_to_id[lab])

all_predictions_both_fintune_same = torch.cat(all_predictions_both_fintune_same).cpu()

correct_fintuned_both_same = all_predictions_both_fintune_same == torch.tensor(all_labels).cpu()
print(f'\n Accuracy both_fintune_same {100*correct_fintuned_both_same.sum()/correct_fintuned_both_same.shape[0]} %')

**Evaluate performance**

In [None]:
print(f'\nOf {max(all_labels)} classes random is {1/max(all_labels)}%')
print(f'Accuracy baseline {100*correct_base.sum()/correct_base.shape[0]} %')
print(f'Accuracy image_finetune {100*correct_finetuned_image.sum()/correct_finetuned_image.shape[0]} %')
print(f'Accuracy text_finetune {100*correct_finetuned_text.sum()/correct_finetuned_text.shape[0]} %')   
print(f'Accuracy both_finetune separate {100*correct_finetuned_both.sum()/correct_finetuned_both.shape[0]} %')
print(f'Accuracy both_fintune_same {100*correct_fintuned_both_same.sum()/correct_fintuned_both_same.shape[0]} %')
from sklearn.metrics import classification_report
print(dataset.class_to_id)