# Unzip dataset on Kaggle

In [None]:
import os
import zipfile 
local_zip = '/kaggle/working/archive (2).zip'
zip_ref = zipfile.ZipFile(local_zip, 'r')
zip_ref.extractall('/kaggle/working')
zip_ref.close()

In [6]:
!pip install einops

Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl.metadata (13 kB)
Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.7.0


In [19]:
import torch
import torch.nn as nn
import einops
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.functional as F

In [9]:
def get_device():
    if torch.cuda.is_available():
        return torch.device("cuda")
    return torch.device("cpu")

def to_device(data,device):
    if isinstance(data,(list,tuple)):
        return [to_device(x,device) for x in data]
    return data.to(device,non_blocking=True)


class ToDeviceLoader:
    def __init__(self,data,device):
        self.data = data
        self.device = device

    def __iter__(self):
        for batch in self.data:
            yield to_device(batch,self.device)

    def __len__(self):
        return len(self.data)

# Load, preprocess data

In [10]:
test_path = '/kaggle/working/seg_test'
train_path = '/kaggle/working/seg_train'

In [11]:
train_transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
])

test_transform = transforms.Compose([
    transforms.Resize((128,128)),
    transforms.ToTensor(),
])

In [12]:
train_dataset = datasets.ImageFolder(root= train_path, transform= train_transform)
test_dataset = datasets.ImageFolder(root = test_path, transform= test_transform)

In [13]:
torch.manual_seed(101)
train_loader = DataLoader(train_dataset, batch_size= 64, shuffle = True)
test_loader = DataLoader(test_dataset, batch_size= 64)

In [14]:
device = get_device()
train_loader = ToDeviceLoader(train_loader, device)
test_loader = ToDeviceLoader(test_loader, device)

# Patch Embedding

In [15]:
class PatchEmbedding(nn.Module):
    def __init__(self, patch_size,emb_size):
        super().__init__()
        self.patch_size = patch_size
        self.emb_size = emb_size
    def forward(self,images):
        n,c,h,w = images.shape
        image = einops.rearrange(images, 'b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1 = self.patch_size, p2 = self.patch_size)
        # image = nn.Linear(self.patch_size * self.patch_size * c, self.emb_size)(image)
        return image

# Transformer in ViT

In [16]:
class Transformer(nn.Module):
    def __init__(self, emb_dim ,n_heads, drop = 0, ratio_dim = 4):
        super().__init__()
        self.emb_dim = emb_dim
        self.ratio_dim = ratio_dim
        self.num_head = n_heads
        self.layer_norm1 = nn.LayerNorm(emb_dim)
        self.attn = nn.MultiheadAttention(embed_dim= self.emb_dim, num_heads= self.num_head)
        self.layer_norm2 = nn.LayerNorm(emb_dim)
        self.MLP = nn.Sequential(
            nn.Linear(emb_dim, emb_dim * ratio_dim),
            nn.GELU(),
            nn.Dropout(drop),
            nn.Linear(emb_dim * ratio_dim, emb_dim)
        )
    def forward(self,x):
        x = self.layer_norm1(x)
        x = x + self.attn(x,x,x)[0]
        x = x + self.MLP(self.layer_norm2(x))
        return x

# Model ViT

In [20]:
class MyViT(nn.Module):
    def __init__(self,n_layer, img_size,patch_size, emb_dim, n_heads, dropout, out_dim):
        super().__init__()
        self.num_layer = n_layer
        self.img_size = img_size
        self.patch_size = patch_size
        self.embed_dim = emb_dim
        self.num_head = n_heads
        self.drop = nn.Dropout(dropout)
        self.out_dim = out_dim
        n_patch = (img_size**2) // (patch_size ** 2)
        self.cls_token = nn.Parameter(torch.rand(1,emb_dim))
        self.positon = nn.Parameter(torch.rand(n_patch +1, emb_dim))
        self.PatchEmbed = PatchEmbedding(patch_size, emb_dim)
        self.linear1 = nn.Linear(patch_size * patch_size * 3, emb_dim)
        self.transformer = nn.ModuleList([Transformer(emb_dim,n_heads,dropout) for _ in range(n_layer)])
        self.linear2 = nn.Linear(emb_dim, out_dim)
    def forward(self,x):
        n,c,h,w = x.shape
        x = self.PatchEmbed(x)
        x = self.linear1(x)
        x = torch.stack([torch.vstack((self.cls_token, x[i])) for i in range(n)])
        z = x + self.positon.repeat(n,1,1)
        for block in self.transformer:
            z =  block(z)

        out = self.linear2(z)
        out = out[:,0]
        
        return out

In [21]:
model = MyViT(12, 128, 16, 768, 12,0, 6)
print(model)

MyViT(
  (drop): Dropout(p=0, inplace=False)
  (PatchEmbed): PatchEmbedding()
  (linear1): Linear(in_features=768, out_features=768, bias=True)
  (transformer): ModuleList(
    (0-11): 12 x Transformer(
      (layer_norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
      )
      (layer_norm2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (MLP): Sequential(
        (0): Linear(in_features=768, out_features=3072, bias=True)
        (1): GELU(approximate='none')
        (2): Dropout(p=0, inplace=False)
        (3): Linear(in_features=3072, out_features=768, bias=True)
      )
    )
  )
  (linear2): Linear(in_features=768, out_features=6, bias=True)
)


In [22]:
model.cuda()

MyViT(
  (drop): Dropout(p=0, inplace=False)
  (PatchEmbed): PatchEmbedding()
  (linear1): Linear(in_features=768, out_features=768, bias=True)
  (transformer): ModuleList(
    (0-11): 12 x Transformer(
      (layer_norm1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (attn): MultiheadAttention(
        (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
      )
      (layer_norm2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (MLP): Sequential(
        (0): Linear(in_features=768, out_features=3072, bias=True)
        (1): GELU(approximate='none')
        (2): Dropout(p=0, inplace=False)
        (3): Linear(in_features=3072, out_features=768, bias=True)
      )
    )
  )
  (linear2): Linear(in_features=768, out_features=6, bias=True)
)

In [23]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

In [26]:
epoch = 1

In [25]:
train_epoch_loss = []
train_epoch_acc = []
test_epoch_loss = []
test_epoch_acc = []

In [28]:
for i in range(epoch):
    trn_corr = 0
    tst_corr = 0
    train_losses = []
    train_correct = []
    # Run the training batches
    for b, (X_train, y_train) in enumerate(train_loader):
        y_pred = model(X_train)
        loss = criterion(y_pred, y_train)

        predicted = torch.max(y_pred.data, 1)[1]
        batch_corr = (predicted == y_train).sum()
        trn_corr += batch_corr

        # Update parameters
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_losses.append(loss.item())

    epoch_train_loss = np.mean(train_losses)
    train_correct.append(trn_corr)
    train_acc = trn_corr/len(train_dataset)
    train_epoch_loss.append(epoch_train_loss)
    train_epoch_acc.append(train_acc)
    

    test_corr = 0
    for b, (X_test,y_test) in enumerate(test_loader):
      y_pred = model(X_test)
      predicted = torch.max(y_pred.data, 1)[1]
      batch_corr = (predicted == y_test).sum()
      test_corr += batch_corr
    test_acc = test_corr/len(test_dataset)
    test_epoch_acc.append(test_acc)
    print(f'Epoch {i+1}')
    print(f'train_loss : {epoch_train_loss}')
    print(f'train_accuracy : {train_acc*100}')
    print(f'test accuracy: {test_acc*100}')
    print(25*'==')

Epoch 1
train_loss : 0.0
train_accuracy : 99.99999237060547
test accuracy: 100.0
