In [None]:
! git clone https://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@github.com/DLCV-Fall-2021/hw3-SonicBenz0408.git
! bash ./hw3-SonicBenz0408/get_dataset.sh
! pip install -r /content/hw3-SonicBenz0408/requirements.txt

In [None]:
import random

import torch
import numpy as np


def same_seeds(seed):
    # Python built-in random module
    random.seed(seed)
    # Numpy
    np.random.seed(seed)
    # Torch
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

same_seeds(7414)

In [None]:
from pytorch_pretrained_vit import ViT
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch import optim
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils import spectral_norm
from PIL import Image
import matplotlib.pyplot as plt
import random
import timm

In [None]:
class ImgDataset(Dataset):
    def __init__(self, path, fnames, transform):
        self.path = path
        self.fnames = fnames
        self.transform = transform
        self.num_samples = len(self.fnames)

    def __getitem__(self,idx):
        slice_point = self.fnames[idx].find("_")
        label = int(self.fnames[idx][:slice_point])
        fname = os.path.join(self.path, self.fnames[idx])
        img = Image.open(fname).convert("RGB")
        img = self.transform(img)
        return img, label

    def __len__(self):
        return self.num_samples


In [None]:
t_tfm = transforms.Compose([
    transforms.Resize((384, 384)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.3),
    transforms.RandomAffine(degrees=20, scale=(0.7, 1.3)),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
])

v_tfm = transforms.Compose([
    transforms.Resize((384, 384)),
    #transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    #transforms.ColorJitter(brightness=0.2, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
])

train_path = "/content/hw3_data/p1_data/train/"
total_fnames = os.listdir(train_path)
random.shuffle(total_fnames)
train_fnames = total_fnames[len(total_fnames)//10:]
val_fnames = total_fnames[:len(total_fnames)//10]

train_fnames.sort()
val_fnames.sort()

train_set = ImgDataset(train_path, train_fnames, t_tfm)
val_set = ImgDataset(train_path, val_fnames, v_tfm)

In [None]:
images = [(train_set[i][0]+1)/2 for i in range(100)]
grid_img = torchvision.utils.make_grid(images, nrow=10)
plt.figure(figsize=(10,10))
plt.imshow(grid_img.permute(1, 2, 0))
plt.show()

In [None]:
from torch.optim.lr_scheduler import _LRScheduler
# from package: transformers

def get_linear_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, last_epoch=-1):

    def lr_lambda(current_step: int):
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        return max(
            0.0, float(num_training_steps - current_step) / float(max(1, num_training_steps - num_warmup_steps))
        )

    return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda, last_epoch)

In [None]:
import timm
all_pretrained_models_available = timm.list_models(pretrained=True)

In [None]:
print(all_pretrained_models_available)

In [None]:
# Training hyperparameters
batch_size = 32
lr = 1e-4
n_epoch = 100

ckpt_dir = os.path.join("/content/", 'checkpoints')
os.makedirs(ckpt_dir, exist_ok=True)

# Model
#model = ViT('L_32', pretrained=True, patches=14, num_classes=37).cuda()
model = timm.create_model("vit_tiny_patch16_384", pretrained=True, num_classes=37).cuda()
# Loss
criterion = nn.CrossEntropyLoss()

# Optimizer
opt = torch.optim.AdamW(model.parameters(), lr=lr)

# DataLoader
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_set, batch_size=1, shuffle=True, num_workers=2)
sch = torch.optim.lr_scheduler.LinearLR(opt, 1.0, 0.01, total_iters=len(train_loader)*10)
#sch = get_linear_schedule_with_warmup(opt, len(train_loader) * 2 , len(train_loader) * (n_epoch))

In [None]:
print(model)

In [None]:
loss_list = []
acc_list = []
best_acc = 0.

for epoch in range(n_epoch):
    model.train()    
    for imgs, labels in train_loader:
        imgs, labels = imgs.cuda(), labels.cuda()
        
        logits = model(imgs)
        loss = criterion(logits, labels)

        opt.zero_grad()
        loss.backward()

        opt.step()
        sch.step()
        #print(sch.get_lr())
    
    # validation
    model.eval()
    val_loss = 0.
    acc = 0
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs, labels = imgs.cuda(), labels.cuda()
            
            logits = model(imgs)
            val_loss += criterion(logits, labels).float()
            preds = logits.argmax(dim=-1)
            if preds == labels:
                acc += 1
            
    val_loss /= len(val_loader)
    acc /= len(val_loader)
    loss_list.append(val_loss)
    acc_list.append(acc)
    print(f'epoch {epoch+1}: acc={acc:.4f}, loss={val_loss:.4f}')

    if(acc > best_acc):
        best_acc = acc
        print("save best model with acc =", best_acc)
        torch.save(model.state_dict(), os.path.join(ckpt_dir, "model.ckpt"))

In [None]:
torch.save(model.state_dict(), os.path.join(ckpt_dir, "model9406.ckpt"))

In [None]:
test_path = "/content/hw3_data/p1_data/val/"
test_fnames = os.listdir(test_path)

test_fnames.sort()

test_set = ImgDataset(test_path, test_fnames, v_tfm)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=2)

model = timm.create_model("vit_small_patch16_384", num_classes=37).cuda()
model.load_state_dict(torch.load("/content/model.ckpt"))
model.eval()
hit = 0
for img, label in test_loader:
    img, label = img.cuda(), label.cuda()
            
    logits = model(img)
    pred = logits.argmax(dim=-1).float().item()
    #print(pred, label)
    if pred == label:
        hit += 1
    #else:
        #print("label:", label, "pred:", pred)
acc = hit / len(test_loader)
print("acc=", acc)

In [None]:
test_path = "/content/hw3_data/p1_data/val/"
test_fnames = os.listdir(test_path)

test_fnames.sort()

test_set = ImgDataset(test_path, test_fnames, v_tfm)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=2)

model = timm.create_model("vit_small_patch16_384", num_classes=37).cuda()
model.load_state_dict(torch.load("/content/model.ckpt"))
model.eval()
pred_list = []
for img, label in test_loader:
    img, label = img.cuda(), label.cuda()
    logits = model(img)
    pred = logits.argmax(dim=-1).int().item()
    pred_list.append(pred)

In [None]:
with open("/content/pred.csv", "w") as file:
    file.write("filename,label\n")
    for i in range(len(pred_list)):
        file.write(test_fnames[i]+","+str(pred_list[i])+"\n")

In [None]:
from types import MethodType
def new_forward_features(self, x):
    x = self.patch_embed(x)
    cls_token = self.cls_token.expand(x.shape[0], -1, -1)  # stole cls_tokens impl from Phil Wang, thanks
    if self.dist_token is None:
        x = torch.cat((cls_token, x), dim=1)
    else:
        x = torch.cat((cls_token, self.dist_token.expand(x.shape[0], -1, -1), x), dim=1)
    x = self.pos_drop(x + self.pos_embed)
    x = self.blocks(x)
    return x

def new_forward(self, x):
    x = self.forward_features(x)
    return x
    #return self.pos_embed
def block_forward(self, x):
    x = self.attn(self.norm1(x))
    return x
def attn_forward(self, x):
    B, N, C = x.shape
    qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
    q, k, v = qkv.unbind(0)   # make torchscript happy (cannot use tensor as tuple)

    attn = (q @ k.transpose(-2, -1)) * self.scale
    attn = attn.softmax(dim=-1)
    attn = self.attn_drop(attn)

    return attn

In [None]:
from sklearn.metrics import pairwise_distances
def cosine_similarity(a, b):
    # Compute cosine similarity between two numpy vectors a and b
    inner_product = np.inner(a, b)
    #print(inner_product)
    return inner_product / (np.linalg.norm(a) * np.linalg.norm(b))

In [None]:
test_path = "/content/hw3_data/p1_data/val/"
test_fnames = os.listdir(test_path)

test_fnames.sort()

test_set = ImgDataset(test_path, test_fnames, v_tfm)
test_loader = DataLoader(test_set, batch_size=1, shuffle=False, num_workers=2)

model = timm.create_model("vit_small_patch16_384", num_classes=37).cuda()
model.forward = MethodType(new_forward, model)
model.load_state_dict(torch.load("/content/drive/MyDrive/Hw3/model.ckpt"))
model.eval()
hit = 0
for img, label in test_loader:
    img, label = img.cuda(), label.cuda()
            
    embed = model(img)
    break
    #else:
        #print("label:", label, "pred:", pred)

In [None]:
from PIL import Image, ImageDraw
#reference:https://nbviewer.org/github/luo3300612/Visualizer/blob/main/demo.ipynb
def cls_padding(image, mask, cls_weight, grid_size):
    if not isinstance(grid_size, tuple):
        grid_size = (grid_size, grid_size)
        
    image = np.array(image)

    H, W = image.shape[:2]
    delta_H = int(H/grid_size[0])
    delta_W = int(W/grid_size[1])
    
    padding_w = delta_W
    padding_h = H
    padding = np.ones_like(image) * 255
    padding = padding[:padding_h, :padding_w]
    
    padded_image = np.hstack((padding,image))
    padded_image = Image.fromarray(padded_image)
    draw = ImageDraw.Draw(padded_image)
    draw.text((int(delta_W/4),int(delta_H/4)),'CLS', fill=(0,0,0)) # PIL.Image.size = (W,H) not (H,W)

    mask = mask / max(np.max(mask),cls_weight)
    cls_weight = cls_weight / max(np.max(mask),cls_weight)
    
    if len(padding.shape) == 3:
        padding = padding[:,:,0]
        padding[:,:] = np.min(mask)
    mask_to_pad = np.ones((1,1)) * cls_weight
    mask_to_pad = Image.fromarray(mask_to_pad)
    mask_to_pad = mask_to_pad.resize((delta_W, delta_H))
    mask_to_pad = np.array(mask_to_pad)

    padding[:delta_H,  :delta_W] = mask_to_pad
    padded_mask = np.hstack((padding, mask))
    padded_mask = padded_mask
    
    meta_mask = np.zeros((padded_mask.shape[0], padded_mask.shape[1],4))
    meta_mask[delta_H:,0: delta_W, :] = 1 
    
    return padded_image, padded_mask, meta_mask
def visualize_grid_to_grid_with_cls(att_map, grid_index, image, grid_size=14, alpha=0.6):
    if not isinstance(grid_size, tuple):
        grid_size = (grid_size, grid_size)
    
    attention_map = att_map[grid_index]
    cls_weight = attention_map[0]
    
    mask = attention_map[1:].reshape(grid_size[0], grid_size[1])
    mask = Image.fromarray(mask).resize((image.size))
    
    padded_image ,padded_mask, meta_mask = cls_padding(image, mask, cls_weight, grid_size)
    
    if grid_index != 0: # adjust grid_index since we pad our image
        grid_index = grid_index + (grid_index-1) // grid_size[1]
        
    grid_image = highlight_grid(padded_image, [grid_index], (grid_size[0], grid_size[1]+1))
    
    fig, ax = plt.subplots(1, 2, figsize=(10,7))
    fig.tight_layout()
    
    ax[0].imshow(grid_image)
    ax[0].axis('off')
    
    ax[1].imshow(grid_image)
    ax[1].imshow(padded_mask, alpha=alpha, cmap='rainbow')
    ax[1].imshow(meta_mask)
    ax[1].axis('off')
def highlight_grid(image, grid_indexes, grid_size=14):
    if not isinstance(grid_size, tuple):
        grid_size = (grid_size, grid_size)
    
    W, H = image.size
    h = H / grid_size[0]
    w = W / grid_size[1]
    image = image.copy()
    for grid_index in grid_indexes:
        x, y = np.unravel_index(grid_index, (grid_size[0], grid_size[1]))
        a= ImageDraw.ImageDraw(image)
        a.rectangle([(y*w,x*h),(y*w+w,x*h+h)],fill =None,outline ='red',width =2)
    return image

In [None]:
pic_1_path = "/content/hw3_data/p1_data/val/26_5064.jpg"
pic_1 = v_tfm(Image.open(pic_1_path).convert("RGB"))
pic_1 = pic_1.reshape((1, pic_1.shape[0], pic_1.shape[1], pic_1.shape[2])).cuda()
pic_2_path = "/content/hw3_data/p1_data/val/29_4718.jpg"
pic_2 = v_tfm(Image.open(pic_2_path).convert("RGB"))
pic_2 = pic_2.reshape((1, pic_2.shape[0], pic_2.shape[1], pic_2.shape[2])).cuda()
pic_3_path = "/content/hw3_data/p1_data/val/31_4838.jpg"
pic_3 = v_tfm(Image.open(pic_3_path).convert("RGB"))
pic_3 = pic_3.reshape((1, pic_3.shape[0], pic_3.shape[1], pic_3.shape[2])).cuda()


model = timm.create_model("vit_small_patch16_384", num_classes=37).cuda()
model.forward = MethodType(new_forward, model)
model.forward_features = MethodType(new_forward_features, model)
model.blocks[-1].forward = MethodType(block_forward, model.blocks[-1])
model.blocks[-1].attn.forward = MethodType(attn_forward, model.blocks[-1].attn)
model.load_state_dict(torch.load("/content/drive/MyDrive/Hw3/model.ckpt"))
model.eval()
print("over")

In [None]:
att_mat = model(pic_1)
att_mat = att_mat.reshape((6, 577, 577))
att_mat = att_mat.mean(dim=0).cpu().detach().numpy()

In [None]:
visualize_grid_to_grid_with_cls(att_mat, 0, Image.open(pic_1_path).convert("RGB").resize((384, 384)), grid_size=24, alpha=0.9)

In [None]:
embed = embed.reshape((embed.shape[1], embed.shape[2]))[1:].cpu().detach().numpy()
norm_embed = np.matmul(embed, embed.transpose())
norm_embed /= np.linalg.norm(norm_embed)
norm_embed = np.reshape(norm_embed, (24, 24, 24*24))

In [None]:
total_cos_img = []
for o_row in range(24):
    total_cos_img.append([])
    for o_col in range(24):
        cos_img = []
        for row in range(24):
            cos_img.append([])
            for col in range(24):
                cos_img[row].append(cosine_similarity(norm_embed[o_row][o_col], norm_embed[row][col]))
        cos_img = np.array(cos_img)
        total_cos_img[o_row].append(cos_img)

In [None]:
fig = plt.figure(figsize=(12, 12))
i = 1
for row in range(24):
    for col in range(24):
        fig.add_subplot(24, 24, i)
        plt.imshow(total_cos_img[row][col])
        plt.axis('off')
        i += 1
plt.show()        