In [2]:
!pip install einops

Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/44.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.7.0


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from einops import rearrange, reduce, repeat
from einops.layers.torch import Rearrange, Reduce

from torchsummary import summary

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [4]:
import torchvision
import torchvision.transforms as transfroms
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
import os
import pandas as pd
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from tqdm.auto import tqdm

In [28]:
BATCH_SIZE = 64
LEARNING_RATE = 0.001
EPOCHS = 3

#Data Import / Preprocessing

In [6]:
from zipfile import ZipFile

with ZipFile('/content/drive/MyDrive/Pytorch_Dataset/fruit_classification.zip', 'r') as zipObj:
    zipObj.extractall('fruit_classification')

In [7]:
def get_dataframe(folder_path, state):
    class_name_list = os.listdir(folder_path)
    if state == 'train':
        file_lists = []
        for fruit in class_name_list:
            fruit_file_list = os.listdir(folder_path + '/' + fruit)
            file_lists.append([[folder_path + '/' + fruit + '/' + x, fruit] for x in fruit_file_list]) #list comprehension is faster in this case

        path_lists = []
        for list in file_lists:
            path_lists += list

        df = pd.DataFrame(path_lists, columns = ['path', 'label'])
    elif state == 'test':
        path_list = [folder_path + '/' + x for x in class_name_list]
        df = pd.DataFrame(path_list, columns = ['path'])

    return df


In [8]:
df = get_dataframe('/content/fruit_classification/train/train', 'train')
CLASS_NUM = len(df['label'].unique())

In [9]:
LabelEncoder = preprocessing.LabelEncoder()
df['label'] = LabelEncoder.fit_transform(df['label'].values)

In [10]:
train_df, valid_df, _, _ = train_test_split(df, df['label'].values, test_size = 0.2, random_state = 10)

In [11]:
class CustomDataset(Dataset):

    def __init__(self, data_path, labels, transformer = None):
        self.path = data_path
        self.labels = labels
        self.transformer = transformer

    def __len__(self):
        return len(self.path)

    def __getitem__(self, idx):
        path = self.path[idx]
        image = cv.imread(path)
        image = cv.cvtColor(image, cv.COLOR_BGR2RGB)

        if self.transformer:
            image = self.transformer(image = image)['image']

        if self.labels is not None:
            label = self.labels[idx]
            return image, label
        else:
            return image


In [12]:
df_transformer = A.Compose([
    A.Resize(224, 224),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5), max_pixel_value = 255.0, always_apply = False, p=1.0),
    ToTensorV2()
])

In [13]:
train_dataset = CustomDataset(train_df['path'].values, train_df['label'].values, df_transformer)
train_loader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 0)

In [14]:
valid_dataset = CustomDataset(valid_df['path'].values, valid_df['label'].values, df_transformer)
valid_loader = DataLoader(valid_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 0)

#Model - Vision Transformer

In [17]:
class PatchEmbedding(nn.Module):
    def __init__(self, in_channels=3, patch_size=16, emb_size=768, img_size=224):
        '''
        in_channels : input channel(which is usually 3(RGB))
        patch_size : patch size(which is 16 according to the paper)
        emb_size : embedding size(which is c*p*p)
        img_size : size of input image
        '''
        super().__init__()

        self.patch_size = patch_size

        #performance gains by doing projection using conv layer instead of linear one
        self.projection = nn.Sequential(
            nn.Conv2d(in_channels = in_channels,
                      out_channels = emb_size,
                      kernel_size = patch_size,
                      stride = patch_size),
            Rearrange('b e (h) (w) -> b (h w) e') #기존 b e h w를 b n e로 변경 여기서 n은 patch의 개수.
        )

        self.cls_token = nn.Parameter(torch.randn(1, 1, emb_size)) #1 x 1 x emb_size tensor. class embedding
        self.positions = nn.Parameter(torch.randn((img_size//patch_size)**2 + 1, emb_size)) #patch_size + 1(position 0) x emb_size tensor

    def forward(self, x):
        b = x.shape[0] #batch size
        x = self.projection(x)

        cls_tokens = repeat(self.cls_token, '() n e -> b n e', b=b) #batch size만큼 반복
        x = torch.cat([cls_tokens, x], dim=1) #class embedding added to the beginning.
        #adding position embedding to the projected flattened patches
        x += self.positions

        return x



In [18]:
# Check PatchEmbedding
x = torch.randn(16, 3, 224, 224).to(device)
patch_embedding = PatchEmbedding().to(device)
patch_output = patch_embedding(x)
print('[batch, 1+num of patches, emb_size] = ', patch_output.shape)

[batch, 1+num of patches, emb_size] =  torch.Size([16, 197, 768])


In [19]:
class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_dim=768, n_heads=8, dropout_ratio=0):
        '''
        hidden_dim : 하나의 단어에 대한 임베딩 차원
        n_heads : head의 개수. 즉, scaled dot product attention의 개수. 이후 concat함.
        dropout_ratio : dropout 비율
        '''

        super().__init__()

        assert hidden_dim % n_heads == 0

        self.hidden_dim = hidden_dim
        self.n_heads = n_heads
        self.head_dim = hidden_dim // n_heads

        self.fc_q = nn.Linear(hidden_dim, hidden_dim)
        self.fc_k = nn.Linear(hidden_dim, hidden_dim)
        self.fc_v = nn.Linear(hidden_dim, hidden_dim)

        self.fc_o = nn.Linear(hidden_dim, hidden_dim)

        self.dropout = nn.Dropout(dropout_ratio)

        self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)

    def forward(self, x, mask = None):
        #split q, k, v in number of heads
        queries = rearrange(self.fc_q(x), 'b n (h d) -> b h n d', h = self.n_heads) #d는 head의 dim. # b, 197, 728 -> b, 8, 197, 91
        keys = rearrange(self.fc_k(x), 'b n (h d) -> b h n d', h = self.n_heads)
        values = rearrange(self.fc_v(x), 'b n (h d) -> b h n d', h = self.n_heads)

        #einsum으로 query와 key matmul.
        energy = torch.einsum('bhqd, bhkd -> bhqk', queries, keys) / self.scale # batch, num_head, query_len, key_len

        if mask is not None:
            energy = energy.masked_fill(mask == 0, -1e10)

        attention = F.softmax(energy, dim=-1)
        out = self.dropout(attention)

        #einsum으로 attention과 value matmul.
        out = torch.einsum('bhal, bhlv -> bhav', out, values) # 197x91
        out = rearrange(out, 'b h n d -> b n (h d)')
        out = self.fc_o(out)

        return out

In [20]:
# Check MultiHeadAttention
MHA = MultiHeadAttention().to(device)
MHA_output = MHA(patch_output)
print(MHA_output.shape)

torch.Size([16, 197, 768])


In [21]:
class FeedForward(nn.Module):
    def __init__(self, hidden_dim, expansion=4, dropout_ratio=0):
        super().__init__()

        self.fc_1 = nn.Linear(hidden_dim, expansion * hidden_dim)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(dropout_ratio)
        self.fc_2 = nn.Linear(hidden_dim * expansion, hidden_dim)

    def forward(self, x):

        x = self.fc_1(x)
        x = self.gelu(x)
        x = self.dropout(x)
        x = self.fc_2(x)

        return x


In [22]:
class EncoderBlock(nn.Module):
    def __init__(self, hidden_dim=768, n_heads=8, dropout_ratio = 0, expansion = 4, forward_dropout = 0):
        super().__init__()

        self.attention_norm = nn.LayerNorm(hidden_dim)
        self.multiheadattention = MultiHeadAttention(hidden_dim = hidden_dim, n_heads = n_heads, dropout_ratio = forward_dropout)
        self.feedforward_norm = nn.LayerNorm(hidden_dim)
        self.feedforward = FeedForward(hidden_dim, expansion = expansion, dropout_ratio = forward_dropout)
        self.dropout = nn.Dropout(dropout_ratio)

    def forward(self, x, mask):

        y = self.attention_norm(x)
        y = self.multiheadattention(y)
        y = self.dropout(y)
        y += x

        z = self.feedforward_norm(y)
        z = self.feedforward(z)
        z = self.dropout(z)

        out = y + z

        return out



In [23]:
class TransformerEncoder(nn.Module):
    def __init__(self, input_dim, hidden_dim=768, n_layers=12, n_heads=8, expansion=4, dropout_ratio=0, forward_dropout=0):
        super().__init__()

        self.layers = nn.ModuleList([EncoderBlock(hidden_dim, n_heads, dropout_ratio, expansion, forward_dropout) for _ in range(n_layers)])

    def forward(self, x, mask):

        for layer in self.layers:
            x = layer(x, mask)

        return x

In [24]:
class ClassificationHead(nn.Module):
    def __init__(self, hidden_dim=768, class_num = 10):
        super().__init__()

        self.norm = nn.LayerNorm(hidden_dim)
        self.fc = nn.Linear(hidden_dim, class_num)

    def forward(self, x):
        x = reduce(x, 'b n e -> b e', 'mean') #Global Average Pooling
        x = self.norm(x)
        x = self.fc(x)

        return x

In [25]:
class ViT(nn.Module):
    def __init__(self, in_channels = 3, patch_size = 16, hidden_dim = 768,
                 img_size = 224, n_layers = 12, class_num = 10, n_heads = 8, expansion = 4, dropout_ratio = 0, forward_dropout = 0):
        super().__init__()

        self.patchembedding = PatchEmbedding(in_channels, patch_size, hidden_dim, img_size)
        self.transformerencoder = TransformerEncoder(hidden_dim, hidden_dim, n_layers, n_heads, expansion, dropout_ratio, forward_dropout)
        self.classificationhead = ClassificationHead(hidden_dim, class_num)

    def forward(self, x):

        x = self.patchembedding(x)
        x = self.transformerencoder(x, None)
        x = self.classificationhead(x)

        return x

In [41]:
model = ViT().to(device)
summary(model, (3,224,224), device=device.type)

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 768, 14, 14]         590,592
         Rearrange-2             [-1, 196, 768]               0
    PatchEmbedding-3             [-1, 197, 768]               0
         LayerNorm-4             [-1, 197, 768]           1,536
            Linear-5             [-1, 197, 768]         590,592
            Linear-6             [-1, 197, 768]         590,592
            Linear-7             [-1, 197, 768]         590,592
           Dropout-8          [-1, 8, 197, 197]               0
            Linear-9             [-1, 197, 768]         590,592
MultiHeadAttention-10             [-1, 197, 768]               0
          Dropout-11             [-1, 197, 768]               0
        LayerNorm-12             [-1, 197, 768]           1,536
           Linear-13            [-1, 197, 3072]       2,362,368
             GELU-14            [-1, 1

#Train

In [15]:
def train(model, epochs, optimizer, train_loader, test_loader, scheduler, device):
    model.to(device)

    criterion = nn.CrossEntropyLoss().to(device)

    best_score = 10000
    best_model = None

    for epoch in range(1, epochs + 1):
        model.train()
        train_loss = []
        for img, label in tqdm(iter(train_loader)):
            img, label = img.float().to(device), label.to(device)

            optimizer.zero_grad()

            model_pred = model(img)

            loss = criterion(model_pred, label)

            loss.backward()
            optimizer.step()

            train_loss.append(loss.item())

        tr_loss = np.mean(train_loss)
        val_loss = validation(model, criterion, test_loader, device)

        print(f'Epoch [{epoch}], Train Loss : [{tr_loss:.5f}] Val Loss : [{val_loss:.5f}]')

        if scheduler is not None:
            scheduler.step()

        if best_score > val_loss:
            best_model = model
            best_score = val_loss

    return best_model


In [16]:
def validation(model, criterion, test_loader, device):
    model.eval()

    val_loss = []

    with torch.no_grad():
        for img, label in tqdm(iter(test_loader)):
            img, label = img.float().to(device), label.to(device)

            model_pred = model(img)

            loss = criterion(model_pred, label)

            val_loss.append(loss.item())

        return np.mean(val_loss)

In [29]:
model = ViT(dropout_ratio = 0.4, forward_dropout = 0.4, class_num = CLASS_NUM)
model.eval()
optimizer = torch.optim.Adam(params=model.parameters(), lr = LEARNING_RATE)
scheduler = None

infer_model = train(model, EPOCHS, optimizer = optimizer, train_loader = train_loader, test_loader = valid_loader, scheduler = scheduler, device = device)

  0%|          | 0/211 [00:00<?, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch [1], Train Loss : [1.35077] Val Loss : [1.33770]


  0%|          | 0/211 [00:00<?, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch [2], Train Loss : [0.98550] Val Loss : [1.52271]


  0%|          | 0/211 [00:00<?, ?it/s]

  0%|          | 0/53 [00:00<?, ?it/s]

Epoch [3], Train Loss : [0.87894] Val Loss : [1.26320]


#Inference

In [30]:
torch.save(infer_model.state_dict(), '/content/drive/MyDrive/Pytorch_Models/ViT.pth')

In [31]:
test_df = get_dataframe('/content/fruit_classification/test/test', 'test')

In [32]:
test_dataset = CustomDataset(test_df['path'].values, None, df_transformer)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=0)

In [33]:
def inference(model, test_loader, device):
    model.to(device)
    model.eval()

    model_preds = []

    with torch.no_grad():
        for img in tqdm(iter(test_loader)):
            img = img.float().to(device)

            model_pred = model(img)
            model_preds += model_pred.argmax(1).detach().cpu().numpy().tolist()

    print('Done.')
    return model_preds

In [34]:
preds = inference(infer_model, test_loader, device)

  0%|          | 0/89 [00:00<?, ?it/s]

Done.


In [35]:
preds = LabelEncoder.inverse_transform(preds)

In [36]:
pred_list = []
for idx, i in enumerate(preds):
    pred_list.append([idx, i])

In [37]:
submission = pd.DataFrame(pred_list, columns = ['id', 'label'])
submission.head(5)

Unnamed: 0,id,label
0,0,Grape Blue
1,1,Pear
2,2,Pepper Red
3,3,Pineapple
4,4,Banana
