<a href="https://colab.research.google.com/github/JEN6YT/APS360-Project/blob/main/ViT_Primary_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install einops

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from PIL import Image, ImageDraw
import pandas as pd
import os
import torchvision.transforms as transforms
import torch
import random
import numpy as np
from imblearn.over_sampling import RandomOverSampler
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
# import Pytorch Library
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

# import matplotlib lirary
import matplotlib.pyplot as plt
from torchsummary import summary
from torchvision.transforms import Compose, Resize, ToTensor
from einops import rearrange

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
X_train = np.load('/content/drive/My Drive/U of T/APS360 Deep Learning/x_train_data.npy', allow_pickle=True)

In [None]:
y_train = np.load('/content/drive/My Drive/U of T/APS360 Deep Learning/y_train_data.npy', allow_pickle=True)

In [None]:
X_val = np.load('/content/drive/My Drive/U of T/APS360 Deep Learning/x_val_data.npy', allow_pickle=True)

In [None]:
y_val = np.load('/content/drive/My Drive/U of T/APS360 Deep Learning/y_val_data.npy', allow_pickle=True)

In [None]:
y_val = y_val[:20]
X_val = X_val[:20]
X_train = X_train[:100]
y_train = y_train[:100]

In [None]:
X_train = np.float32(X_train)
X_val = np.float32(X_val)

In [None]:
X_train = np.transpose(X_train, (0,3,2,1))
X_val = np.transpose(X_val, (0,3,2,1))

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class MyDataset(Dataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = targets
        self.transform = transform

    def __getitem__(self, index):
        x = self.data[index]
        y = self.targets[index]

        if self.transform:
            x = self.transform(x)

        return x, y

    def __len__(self):
        return len(self.data)

In [None]:
train_dataset = MyDataset(X_train, y_train)
val_dataset = MyDataset(X_val, y_val)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, num_workers=1, shuffle=True)
#test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, num_workers=4 , shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=32, num_workers=1 , shuffle=True)

use_cuda = True

print("Training dataset size: ", len(train_dataset))
print("Validation dataset size: ", len(val_dataset))

Training dataset size:  100
Validation dataset size:  20


In [None]:
import torch
from torch import nn
from einops.layers.torch import Rearrange
from einops import repeat


In [None]:

class PatchEmbedding(nn.Module):
    def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768, img_size: int = 224):
        self.patch_size = patch_size
        super().__init__()
        self.projection = nn.Sequential(
            nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size),
            Rearrange('b e (h) (w) -> b (h w) e'),
        ) # this breaks down the image in s1xs2 patches, and then flat them
        
        self.cls_token = nn.Parameter(torch.randn(1,1, emb_size))
        self.positions = nn.Parameter(torch.randn((img_size // patch_size) **2 + 1, emb_size))

        
    def forward(self, x: Tensor) -> Tensor:
        b, _, _, _ = x.shape
        x = self.projection(x)
        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=b)
        x = torch.cat([cls_tokens, x], dim=1) #prepending the cls token
        x += self.positions
        return x

class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size: int = 768, num_heads: int = 8, dropout: float = 0):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        self.qkv = nn.Linear(emb_size, emb_size * 3) # queries, keys and values matrix
        self.att_drop = nn.Dropout(dropout)
        self.projection = nn.Linear(emb_size, emb_size)
        
    def forward(self, x : Tensor, mask: Tensor = None) -> Tensor:
        # split keys, queries and values in num_heads
        qkv = rearrange(self.qkv(x), "b n (h d qkv) -> (qkv) b h n d", h=self.num_heads, qkv=3)
        queries, keys, values = qkv[0], qkv[1], qkv[2]
        # sum up over the last axis
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys) # batch, num_heads, query_len, key_len
        
        if mask is not None:
            fill_value = torch.finfo(torch.float32).min
            energy.mask_fill(~mask, fill_value)
            
        scaling = self.emb_size ** (1/2)
        
        att = F.softmax(energy, dim=-1) / scaling
        att = self.att_drop(att)
        out = torch.einsum('bhal, bhlv -> bhav ', att, values) # sum over the third axis
        out = rearrange(out, "b h n d -> b n (h d)")
        out = self.projection(out)
        
        return out


class ResidualAdd(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn
        
    def forward(self, x, **kwargs):
        res = x
        x = self.fn(x, **kwargs)
        x += res
        return x


class FeedForwardBlock(nn.Sequential):
    def __init__(self, emb_size: int, L: int = 4, drop_p: float = 0.):
        super().__init__(
            nn.Linear(emb_size, L * emb_size),
            nn.GELU(),
            nn.Dropout(drop_p),
            nn.Linear(L * emb_size, emb_size),
        )


class TransformerEncoderBlock(nn.Sequential):
    def __init__(self, emb_size: int = 768, drop_p: float = 0., forward_expansion: int = 4,
                 forward_drop_p: float = 0.,
                 **kwargs):
                 
        super().__init__(
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                MultiHeadAttention(emb_size, **kwargs),
                nn.Dropout(drop_p)
            )),
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size),
                FeedForwardBlock(
                    emb_size, L=forward_expansion, drop_p=forward_drop_p),
                nn.Dropout(drop_p)
            )
            ))
        
class TransformerEncoder(nn.Sequential):
    def __init__(self, depth: int = 12, **kwargs):
        super().__init__(*[TransformerEncoderBlock(**kwargs) for _ in range(depth)])


class ClassificationHead(nn.Sequential):
    def __init__(self, emb_size: int = 768, n_classes: int = 1000):
        super().__init__(
            Reduce('b n e -> b e', reduction='mean'),
            nn.LayerNorm(emb_size), 
            nn.Linear(emb_size, n_classes))
        
class ViT(nn.Sequential):
    def __init__(self,     
                in_channels: int = 3,
                patch_size: int = 16,
                emb_size: int = 768,
                img_size: int = 224,
                depth: int = 12,
                n_classes: int = 2,
                **kwargs):
        super().__init__(
            PatchEmbedding(in_channels, patch_size, emb_size, img_size),
            TransformerEncoder(depth, emb_size=emb_size, **kwargs),
            ClassificationHead(emb_size, n_classes)
        )

In [None]:
def get_accuracy(model, data):
    correct, total = 0, 0
    for sms, labels in data:
        output = model(sms[0])
        pred = output.max(1, keepdim=True)[1]
        correct += pred.eq(labels.view_as(pred)).sum().item()
        total += labels.shape[0]
    return correct / total

In [None]:
def train_network(model, train, valid, num_epochs=5, batch_size = 64, learning_rate=1e-5):
    use_cuda = True

    print("epoch 0")
    optimizer = optim.Adam(model.parameters(), learning_rate)
    criterion = nn.CrossEntropyLoss()
    

    losses, train_acc, valid_acc = [], [], []
    epochs = []

    for epoch in range(num_epochs):
        print("ok")
        # for image in iter(train_loader):
        #     optimizer.zero_grad()
        #     # model.float()
        #     pred = model(image)
        #     loss = criterion(pred, image)
        #     loss.backward()
        #     optimizer.step()
        for imgs, labels in iter(train):

            if use_cuda and torch.cuda.is_available():
              imgs = imgs.cuda()
              labels = labels.cuda()
            
            # imgs = torch.from_numpy(imgs)
            # imgs = torch.unsqueeze(imgs,0)
            # imgs = imgs.numpy()
            print(imgs.shape)
            #labels = np.reshape(labels, (1,1))
            # labels = torch.tensor(labels, dtype=torch.int8)
            out = model(imgs)             # forward pass
            # out = torch.flatten(out)
            # out = out.type(torch.LongTensor)
            print(out.shape, labels.shape)

            loss = criterion(out, labels) # compute the total loss
            loss.backward()               # backward pass (compute parameter updates)
            optimizer.step()              # make the updates for each parameter
            optimizer.zero_grad()         # a clean up step for PyTorch
        
        losses.append(loss)
        epochs.append(epoch+1)
        train_acc.append(get_accuracy(model, train))
        valid_acc.append(get_accuracy(model, valid))
        print("Epoch %d; Loss %f; Train Acc %f; Val Acc %f" % (
              epoch+1, loss, train_acc[-1], valid_acc[-1]))


    # plotting
    plt.title("Training Curve")
    plt.plot(losses, label="Train")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.show()

    plt.title("Training Curve")
    plt.plot(epochs, train_acc, label="Train")
    plt.plot(epochs, valid_acc, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend(loc='best')
    plt.show()

In [None]:
net = VisionTransformer()
train_network(net, train_loader, val_loader)

epoch 0
ok
torch.Size([100, 3, 224, 224])
