#### Library

In [1]:
import os
import math
import argparse
from functools import partial

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from tqdm import tqdm
from torch.nn.init import trunc_normal_
from transformers import ViTForImageClassification

Disabling PyTorch because PyTorch >= 2.1 is required but found 2.0.1


In [2]:
class VisionTransformerWithLinear(nn.Module):
    def __init__(self, base_vit, embed_dim=384, num_classes=10, **kwargs):
        super().__init__()
        self.base_vit = base_vit 
        self.fc = nn.Linear(embed_dim, num_classes) # Fc layer

    def forward(self, x):
        features = self.base_vit(x) # 이미지를 인코더로 입력
        features = torch.nn.functional.normalize(features, dim=-1) # 인코더의 출력을 정규화
        logits = self.fc(features)  # 분류 헤드를 거쳐 최종 로짓을 계산
        return logits

In [3]:
class VisionTransformer(nn.Module):
    def __init__(
            self,
            img_size=[224],
            patch_size=16,
            in_chans=3,
            num_classes=0,
            embed_dim=768,
            depth=12,
            num_heads=12,
            mlp_ratio=4.,
            drop_rate=0.,
            attn_drop_rate=0.,
            drop_path_rate=0.,
            norm_layer=nn.LayerNorm,
            **kwargs
        ):

        super().__init__()

        # Embedding size
        self.embed_dim = embed_dim

        # From image to patch
        self.patch_embed = PatchEmbed(img_size=img_size[0], patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim)

        # 인풋 image와 patch 크기로부터 계산된 patch 갯수 (224/16 = 14 -> 가로, 세로 14개씩 patch로 총 196개의 patch가 생성)
        num_patches = self.patch_embed.num_patches

        # CLS Token 생성, 학습가능한 파라미터로 설정 (1개 patch)
        # 일단 batch size를 1로 설정 후 이후에 batch size에 맞게 broadcast
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))

        # Positional embedding 생성, 학습가능한 파라미터 설정 (각 patch + class token)
        self.pos_embed = nn.Parameter(torch.zeros(1, num_patches + 1, embed_dim))

        # Postional embedidng 결과에 dropout을 적용하기 위함
        self.pos_drop = nn.Dropout(p=drop_rate)

        # Drop 확률을 depth에 따라 차등적용
        dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)]

        # Attention blocks
        self.blocks = nn.ModuleList([
            Block(
                dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio,
                drop=drop_rate, attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer)
            for i in range(depth)])

        self.norm = norm_layer(embed_dim)

        # Class token과 positional embedding의 parameter 초기화
        trunc_normal_(self.pos_embed, std=.02)
        trunc_normal_(self.cls_token, std=.02)

        # Layernorm과 classification head의 parameter 초기화
        self.apply(self._init_weights)

    def _init_weights(self, m):
        """Layernorm과 classification head의 parameter 초기화하는 함수"""

        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    def interpolate_pos_encoding(self, x, w, h):
        """Pre-trained된 positional encoding을 interpolation을 사용하여 고해상도 이미지에 적용하기 위한 함수"""

        npatch = x.shape[1] - 1
        N = self.pos_embed.shape[1] - 1

        # Interpolation 할 필요가 없으면 interpoltation 없이 postional embedding 적용
        if npatch == N and w == h:
            return self.pos_embed

        # Interpolation을 위한 코드
        class_pos_embed = self.pos_embed[:, 0]
        patch_pos_embed = self.pos_embed[:, 1:]
        dim = x.shape[-1]
        w0 = w // self.patch_embed.patch_size
        h0 = h // self.patch_embed.patch_size
        w0, h0 = w0 + 0.1, h0 + 0.1
        patch_pos_embed = nn.functional.interpolate(
            patch_pos_embed.reshape(1, int(math.sqrt(N)), int(math.sqrt(N)), dim).permute(0, 3, 1, 2),
            scale_factor=(w0 / math.sqrt(N), h0 / math.sqrt(N)),
            mode='bicubic',
        )
        assert int(w0) == patch_pos_embed.shape[-2] and int(h0) == patch_pos_embed.shape[-1]
        patch_pos_embed = patch_pos_embed.permute(0, 2, 3, 1).view(1, -1, dim)
        return torch.cat((class_pos_embed.unsqueeze(0), patch_pos_embed), dim=1)

    def prepare_tokens(self, x, interpolate_pos_encoding=False):
        """이미지 embedding후 classification token과 positional embedding을 추가로 적용하는 함수"""

        # (batch, number of channel, width, height)
        B, nc, w, h = x.shape

        # 이미지를 토큰화후 embedding
        x = self.patch_embed(x)  # patch linear embedding

       # [CLS] 토큰을 batch size에 맞게 확장
        cls_tokens = self.cls_token.expand(B, -1, -1)

        # [CLS] 토큰을 embedding patch에 추가
        x = torch.cat((cls_tokens, x), dim=1)

        # add positional encoding to each token
        x = x + self.interpolate_pos_encoding(x, w, h)

        # positional embedding 결과에 dropout 적용
        x = self.pos_drop(x)

        return x

    def forward(self, x, return_all_patches=False):
        # 이미지 -> token -> embedding
        x = self.prepare_tokens(x)

        # 정해진 depth만큼 encoder block 적용
        for blk in self.blocks:
            x = blk(x)

        # encoder output을 normalization
        x = self.norm(x)

        # 학습된 모든 patch가 필요한 경우 모든 patch의 latent feature map을 반환
        if return_all_patches:
            return x
        # CLS Token의 latent feature map만 반환
        else:
            return x[:, 0]

In [4]:
def vit_small(patch_size=16, **kwargs):
    model = VisionTransformer(
        patch_size=patch_size,
        embed_dim=384,
        depth=12,
        num_heads=6,
        mlp_ratio=4,
        norm_layer=partial(nn.LayerNorm, eps=1e-6),
        **kwargs
    )
    return model

def vit_base(patch_size=16, **kwargs):
    model = VisionTransformer(
        patch_size=patch_size,
        embed_dim=768,
        depth=12,
        num_heads=12,
        mlp_ratio=4,
        norm_layer=partial(nn.LayerNorm, eps=1e-6),
        **kwargs
    )
    return model

In [5]:
class PatchEmbed(nn.Module):
    """ 이미지를 Patch로 나누기 위한 class"""
    def __init__(self, img_size=224, patch_size=16, in_chans=3, embed_dim=768):
        super().__init__()
        num_patches = (img_size // patch_size) * (img_size // patch_size)
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = num_patches

        # Convolution filter와 stride를 이용하여 이미지를 patch화
        #   - filter의 크기 = patch의 크기
        #   - stride = patch의 크기
        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        B, C, H, W = x.shape
        x = self.proj(x).flatten(2).transpose(1, 2)
        return x

In [6]:
class Attention(nn.Module):
    """Attention 연산을 수행하는 클래스"""

    def __init__(self, dim, num_heads=8, attn_drop=0., proj_drop=0.):
        super().__init__()
        self.num_heads = num_heads
        head_dim = dim // num_heads

        # Attention score를 계산할 때 softmax의 분모에 해당하는 부분
        self.scale = head_dim ** -0.5

        # Embedding된 token을 각각 Q, K, V로 mapping
        self.qkv = nn.Linear(dim, dim * 3)

        # Attention에 dropout 적용
        self.attn_drop = nn.Dropout(attn_drop)

        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)

    def forward(self, x):
        B, N, C = x.shape
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]

        # Attention score 계산 (softmax 부분) 후 dropout 적용
        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)

        # Value를 반영한 최종 Attention score 계산 후 projection + dropout
        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

In [7]:
class Block(nn.Module):
    """Transformer의 encoder block"""

    def __init__(self, dim, num_heads, mlp_ratio=4., drop=0., attn_drop=0.,
                 drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm):
        super().__init__()
        self.norm1 = norm_layer(dim)
        self.attn = Attention(
            dim, num_heads=num_heads,  attn_drop=attn_drop, proj_drop=drop
        )
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(
            in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop
        )

    def forward(self, x):
        """Positional embedding까지 적용된 latent feature를 input으로 받음"""

        # normalization 후 attention score 계산
        y = self.attn(self.norm1(x))
        x = x + self.drop_path(y)
        x = x + self.drop_path(self.mlp(self.norm2(x)))

        return x

def drop_path(x, drop_prob: float = 0., training: bool = False):
    """Dropout과 같이 overfitting을 막기위한 조치, training 단계에서만 적용"""

    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_() # binarize
    output = x.div(keep_prob) * random_tensor
    return output

class DropPath(nn.Module):
    """Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks)."""
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)

class Mlp(nn.Module):
    """Transformer encoder의 feed forward"""
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = act_layer()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(drop)

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        return x

In [8]:
# CIFAR10 dataset 전처리
transform = transforms.Compose([
    transforms.Resize((224, 224)), # ViT expects images of size 224x224
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# CIFAR10의 training, test set 다운로드
train_dataset = torchvision.datasets.CIFAR10(root='../data/cifar10', train=True, download=True, transform=transform)
test_dataset = torchvision.datasets.CIFAR10(root='../data/cifar10', train=False, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=512, shuffle=False)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ../data/cifar10/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:36<00:00, 4632346.61it/s] 


Extracting ../data/cifar10/cifar-10-python.tar.gz to ../data/cifar10
Files already downloaded and verified


In [10]:
# model 정의
method = 'custom' # or hugging
pretrain = True

if method == 'custom':
    # 위에서 정의한 ViT 모델
    encoder = vit_small()
    if pretrain:
        state_dict = torch.load("../data/dino_deitsmall16_pretrain.pth")
        encoder.load_state_dict(state_dict)
        model = VisionTransformerWithLinear(encoder).cuda()
    else :
        model = VisionTransformerWithLinear(encoder).cuda()

elif method == 'hugging':
    # huggingface에 준비된 모델
    if pretrain:
        model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224-in21k', num_labels=10).cuda()
    else:
        model = ViTForImageClassification().cuda()

# loss function
criterion = nn.CrossEntropyLoss()

# optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

AssertionError: Torch not compiled with CUDA enabled

In [None]:
num_epochs = 2

# start training
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    cnt = 0
    for i, data in tqdm(enumerate(train_loader), total=len(train_loader)):
        inputs, labels = data[0].cuda(), data[1].cuda()

        optimizer.zero_grad()

        if method == "custom":
            outputs = model(inputs)
        else:
            outputs = model(inputs).logits
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        cnt += len(inputs)
        if i==10:
            break
    if epoch % 1 == 0:  # print every 5 epochs
        print(f"[Training loss at epoch {epoch + 1}]: {running_loss / len(train_loader):.4f}")

print("Training completed.")

In [None]:
# Evaluation
correct = 0
total = 0

with torch.no_grad():
    model.eval()
    for data in tqdm(test_loader):
        images, labels = data[0].cuda(), data[1].cuda()
        outputs = model(images)

        if method == "custom":
            _, predicted = torch.max(outputs, 1)
        else:
            _, predicted = torch.max(outputs.logits, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        break
print(f'Accuracy on the 10000 test images: {100 * correct / total}%')