<a href="https://colab.research.google.com/github/Arachne0/PyTorch/blob/master/ViT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install einops

In [None]:
#@title Patch Embedding
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

from torch import nn
from torch import Tensor
from PIL import Image
from torchvision.transforms import Compose, Resize, ToTensor
from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce
from torchsummary import summary

In [None]:
x = torch.randn(8, 3, 224, 224,device = "cuda")
x.shape

In [None]:
patch_size = 16 # 16 pixels

print('x :', x.shape)
patches = rearrange(x, 'b c (h s1) (w s2) -> b (h w) (s1 s2 c)', s1=patch_size, s2=patch_size)
# Batch * C * H ( W 형태를 가진 이미지를 Batch * N * (P * P * C) 의 벡터로 임베딩해주어야 한다. P: batch_size, N )
print('patches :', patches.shape)

In [None]:
patch_size = 16
in_channels = 3
emb_size = 768

projection = nn.Sequential(
            # using a conv layer instead of a linear one -> performance gains
            nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size),
            Rearrange('b e (h) (w) -> b (h w) e'),
        ).to("cuda")

projection(x).shape

In [None]:
emb_size = 768
img_size = 224
patch_size = 16

# 이미지를 패치사이즈로 나누고 flatten
projected_x = projection(x)
print('Projected X shape :', projected_x.shape)

# cls_token과 pos encoding Parameter 정의
cls_token = nn.Parameter(torch.randn(1,1, emb_size)).to("cuda")
positions = nn.Parameter(torch.randn((img_size // patch_size) **2 + 1, emb_size)).to("cuda")
print('Cls Shape :', cls_token.shape, ', Pos Shape :', positions.shape)

# cls_token을 반복하여 배치사이즈의 크기와 맞춰줌
batch_size = 8
cls_tokens = repeat(cls_token, '() n e -> b n e', b=batch_size)
print('Repeated Cls shape :', cls_tokens.shape)

# cls_token과 projected_x를 concatenate
cat_x = torch.cat([cls_tokens, projected_x], dim=1).to("cuda")

# position encoding을 더해줌
cat_x += positions
print('output : ', cat_x.shape)

In [None]:
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels: int = 3, patch_size: int = 16, emb_size: int = 768, img_size: int = 224):
        self.patch_size = patch_size
        super().__init__()
        self.projection = nn.Sequential(
            # using a conv layer instead of a linear one -> performance gains
            nn.Conv2d(in_channels, emb_size, kernel_size=patch_size, stride=patch_size),
            Rearrange('b e (h) (w) -> b (h w) e'),
        ).to("cuda")
        self.cls_token = nn.Parameter(torch.randn(1,1, emb_size)).to("cuda")
        self.positions = nn.Parameter(torch.randn((img_size // patch_size) **2 + 1, emb_size)).to("cuda")
        
    def forward(self, x: Tensor) -> Tensor:
        b, _, _, _ = x.shape
        x = self.projection(x)
        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=b)
        # prepend the cls token to the input
        x = torch.cat([cls_tokens, x], dim=1).cuda()
        # add position embedding
        x += self.positions

        return x

In [None]:
patch = PatchEmbedding()
x = patch.forward(x)

In [None]:
#@title Linear Projection
emb_size = 768
num_heads = 8
queries= nn.Linear(emb_size, emb_size).cuda()
keys = nn.Linear(emb_size, emb_size).cuda()
values = nn.Linear(emb_size, emb_size).cuda()
# ViT에서는 QKV가 같은 Tensor로 입력된다.
# input Tensor는 3개의 Linear Projection을 통해 여러개의 Head로 나눠진후 각각 Sclaed Dot-Product Attention을 한다
print(keys, queries, values)

In [None]:
#@title Multi Head
queries = rearrange(queries(x), "b n (h d) -> b h n d", h=num_heads)
keys = rearrange(keys(x), "b n (h d) -> b h n d", h=num_heads)
values  = rearrange(values(x), "b n (h d) -> b h n d", h=num_heads)
# Linear Projection을 통해 구한 QKV를 reagrrange 을 통해 8개의 Multi-Head로 나눠준다

print('shape :', queries.shape, keys.shape, values.shape)

In [None]:
#@title Sclae Dot-Product Attention
# Queries * Keys
energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys).cuda()
print('energy :', energy.shape)

# Get Attention Score
scaling = emb_size ** (1/2)
att = F.softmax(energy, dim=-1) / scaling
print('att :', att.shape)

# Attention Score * values
out = torch.einsum('bhal, bhlv -> bhav ', att, values)
print('out :', out.shape)

# Rearrage to emb_size
out = rearrange(out, "b h n d -> b n (h d)")
print('out2 : ', out.shape)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, emb_size: int = 768, num_heads: int = 8, dropout: float = 0):
        super().__init__()
        self.emb_size = emb_size
        self.num_heads = num_heads
        # fuse the queries, keys and values in one matrix
        self.qkv = nn.Linear(emb_size, emb_size * 3).cuda()
        self.att_drop = nn.Dropout(dropout).cuda()
        self.projection = nn.Linear(emb_size, emb_size).cuda()
        
    def forward(self, x : Tensor, mask: Tensor = None) -> Tensor:
        # split keys, queries and values in num_heads
        qkv = rearrange(self.qkv(x), "b n (h d qkv) -> (qkv) b h n d", h=self.num_heads, qkv=3)
        queries, keys, values = qkv[0], qkv[1], qkv[2]
        # sum up over the last axis
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys) # batch, num_heads, query_len, key_len
        if mask is not None:
            fill_value = torch.finfo(torch.float32).min
            energy.mask_fill(~mask, fill_value)
            
        scaling = self.emb_size ** (1/2)
        att = F.softmax(energy, dim=-1) / scaling
        att = self.att_drop(att)
        # sum up over the third axis
        out = torch.einsum('bhal, bhlv -> bhav ', att, values)
        out = rearrange(out, "b h n d -> b n (h d)")
        out = self.projection(out)
        return out

In [None]:
#@title Resudual Block
class ResidualAdd(nn.Module):
    def __init__(self, fn):
        super().__init__()
        self.fn = fn
        
    def forward(self, x, **kwargs):
        res = x
        x = self.fn(x, **kwargs)
        x += res
        return x

In [None]:
#@title FeedForward MLP
class FeedForwardBlock(nn.Sequential):
    def __init__(self, emb_size: int, expansion: int = 4, drop_p: float = 0.):
        super().__init__(
            nn.Linear(emb_size, expansion * emb_size).cuda(),
            nn.GELU().cuda(),
            nn.Dropout(drop_p).cuda(),
            nn.Linear(expansion * emb_size, emb_size).cuda(),
        )
        # 첫번째 레이어에서는 expansion을 곱해준 만큼 임베딩 사이즈를 확장하고, 
        # GELU와 Dropout 후에 두번째 Linear 레이어에서 다시 원래의 emb_size로 축소한다.

In [None]:
#@title Transformer Encoder Block
class TransformerEncoderBlock(nn.Sequential):
    def __init__(self,
                 emb_size: int = 768,
                 drop_p: float = 0.,
                 forward_expansion: int = 4,
                 forward_drop_p: float = 0.,
                 ** kwargs):
        super().__init__(
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size).cuda(),
                MultiHeadAttention(emb_size, **kwargs),
                nn.Dropout(drop_p).cuda()
            ).cuda()),
            ResidualAdd(nn.Sequential(
                nn.LayerNorm(emb_size).cuda(),
                FeedForwardBlock(
                    emb_size, expansion=forward_expansion, drop_p=forward_drop_p),
                nn.Dropout(drop_p).cuda()
            ).cuda()
            ))

In [None]:
#@title Building Block
class TransformerEncoder(nn.Sequential):
    def __init__(self, depth: int = 12, **kwargs):
        super().__init__(*[TransformerEncoderBlock(**kwargs) for _ in range(depth)])
        # *이 붙은 이유는 인자를 list형식으로 보내는 것이 아니라 각각 나눠서 보내줘야하기 때문

In [None]:
#@title Head
class ClassificationHead(nn.Sequential):
    def __init__(self, emb_size: int = 768, n_classes: int = 10):
        super().__init__(
            Reduce('b n e -> b e', reduction='mean'),
            nn.LayerNorm(emb_size).cuda(), 
            nn.Linear(emb_size, n_classes).cuda())

In [None]:
#@title Summary
class ViT(nn.Sequential):
    def __init__(self,     
                in_channels: int = 3,
                patch_size: int = 16,
                emb_size: int = 768,
                img_size: int = 224,
                depth: int = 12,
                n_classes: int = 10,
                **kwargs):
        super().__init__(
            PatchEmbedding(in_channels, patch_size, emb_size, img_size),
            TransformerEncoder(depth, emb_size=emb_size, **kwargs),
            ClassificationHead(emb_size, n_classes)
        )
        
summary(ViT(), (3, 224, 224), device='cuda')

In [None]:
model = ViT()

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.01)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [None]:
import torchvision
import torchvision.transforms as transforms

In [None]:
transform = transforms.Compose(
    [transforms.ToTensor(),
      transforms.Resize(224)])

In [None]:
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform
                                        )
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=2)

In [None]:
learning_rate = 0.001
batch_size = 10 # 원래는 100인데 size error 나서 10으로 줄임 따사서 accuracy는 낮을 수 밖에 
num_classes = 10
epochs = 5

In [None]:
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

In [None]:
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(testset, batch_size=batch_size)

In [None]:
from tqdm import tqdm

In [None]:
for epoch in range(epochs): # epochs수만큼 반복
    avg_cost = 0

    for data, target in tqdm(train_loader):
        data = data.to(device)
        target = target.to(device)
        optimizer.zero_grad() # 모든 model의 gradient 값을 0으로 설정
        hypothesis = model(data) # 모델을 forward pass해 결과값 저장 
        cost = criterion(hypothesis, target) # output과 target의 loss 계산
        cost.backward() # backward 함수를 호출해 gradient 계산
        optimizer.step() # 모델의 학습 파라미터 갱신
        avg_cost += cost / len(train_loader) # loss 값을 변수에 누적하고 train_loader의 개수로 나눔 = 평균
    print('[Epoch: {:>4}] cost = {:>.9}'.format(epoch + 1, avg_cost))

In [None]:
model.eval() # evaluate mode로 전환 dropout 이나 batch_normalization 해제 
with torch.no_grad(): # grad 해제 
    correct = 0
    total = 0

    for data, target in tqdm(test_loader):
        data = data.to(device)
        target = target.to(device)
        out = model(data)
        preds = torch.max(out.data, 1)[1] # 출력이 분류 각각에 대한 값으로 나타나기 때문에, 가장 높은 값을 갖는 인덱스를 추출
        total += len(target) # 전체 클래스 개수 
        correct += (preds==target).sum().item() # 예측값과 실제값이 같은지 비교
        
    print('Test Accuracy: ', 100.*correct/total, '%')


In [None]:
!nvidia-smi