In [1]:
!pip install timm



In [2]:
import torch
import torch.nn as nn
import torch.utils.checkpoint as checkpoint
from timm.models.layers import DropPath, to_2tuple, trunc_normal_

In [3]:
import torch
from torchvision import transforms
from torch.utils.data import Dataset
import torch.nn.functional as F
from torch.autograd import Function
import torch.utils.model_zoo as model_zoo

from easydict import EasyDict

import os
from glob import glob
import pickle
import random
import math

import cv2
import numpy as np
import pandas as pd

In [4]:
class Mlp(nn.Module) : 
  def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.) :
    super().__init__()
    out_features = out_features if out_features is not None else in_features
    hidden_features = hidden_features if hidden_features is not None else in_features
    
    self.fc1 = nn.Linear(in_features, hidden_features)
    self.act = act_layer()
    self.fc2 = nn.Linear(hidden_features, out_features)
    self.drop = nn.Dropout(drop)

  def forward(self, x) :
    x = self.act(self.fc1(x))
    x = self.drop(x)
    x = self.drop(self.fc2(x))
    return x

In [5]:
def window_partition(x, window_size) : 
  batch, H, W, C = x.shape # (n, 64, 64, 1)
  x = x.view(batch, H // window_size, window_size, W // window_size, window_size, C) 
  # print("333333---------", x.shape)
  # window 크기만큼 patch를 나누고 나눈 패치들을 이어 붙인다
  windows = x.permute(0,1,3,2,4,5).contiguous().view(-1, window_size, window_size, C) 
  # print("444444---------", windows.shape)
  return windows

In [6]:
def window_reverse(windows, window_size, H, W):
    """
    Args:
        windows: (num_windows*B, window_size, window_size, C)
        window_size (int): Window size
        H (int): Height of image
        W (int): Width of image
    Returns:
        x: (B, H, W, C)
    """
    B = int(windows.shape[0] / (H * W / window_size / window_size))
    x = windows.view(B, H // window_size, W // window_size, window_size, window_size, -1)
    x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1)
    return x

In [7]:
class WindowAttention(nn.Module):
    r""" Window based multi-head self attention (W-MSA) module with relative position bias.
    It supports both of shifted and non-shifted window.
    Args:
        dim (int): Number of input channels.
        window_size (tuple[int]): The height and width of the window.
        num_heads (int): Number of attention heads.
        qkv_bias (bool, optional):  If True, add a learnable bias to query, key, value. Default: True
        qk_scale (float | None, optional): Override default qk scale of head_dim ** -0.5 if set
        attn_drop (float, optional): Dropout ratio of attention weight. Default: 0.0
        proj_drop (float, optional): Dropout ratio of output. Default: 0.0
    """

    def __init__(self, dim, window_size, num_heads, qkv_bias=True, qk_scale=None, attn_drop=0., proj_drop=0.):

        super().__init__()
        self.dim = dim
        self.window_size = window_size  # Wh, Ww
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = qk_scale or head_dim ** -0.5

        # define a parameter table of relative position bias
        self.relative_position_bias_table = nn.Parameter(
            torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1), num_heads))  # 2*Wh-1 * 2*Ww-1, nH

        # get pair-wise relative position index for each token inside the window
        coords_h = torch.arange(self.window_size[0])
        coords_w = torch.arange(self.window_size[1])
        coords = torch.stack(torch.meshgrid([coords_h, coords_w]))  # 2, Wh, Ww
        coords_flatten = torch.flatten(coords, 1)  # 2, Wh*Ww
        relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :]  # 2, Wh*Ww, Wh*Ww
        relative_coords = relative_coords.permute(1, 2, 0).contiguous()  # Wh*Ww, Wh*Ww, 2
        relative_coords[:, :, 0] += self.window_size[0] - 1  # shift to start from 0
        relative_coords[:, :, 1] += self.window_size[1] - 1
        relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1
        relative_position_index = relative_coords.sum(-1)  # Wh*Ww, Wh*Ww
        self.register_buffer("relative_position_index", relative_position_index)

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)

        trunc_normal_(self.relative_position_bias_table, std=.02)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x, mask=None):
        """
        Args:
            x: input features with shape of (num_windows*B, N, C)
            mask: (0/-inf) mask with shape of (num_windows, Wh*Ww, Wh*Ww) or None
        """
        B_, N, C = x.shape
        qkv = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv[0], qkv[1], qkv[2]  # make torchscript happy (cannot use tensor as tuple)

        q = q * self.scale
        attn = (q @ k.transpose(-2, -1))

        relative_position_bias = self.relative_position_bias_table[self.relative_position_index.view(-1)].view(
            self.window_size[0] * self.window_size[1], self.window_size[0] * self.window_size[1], -1)  # Wh*Ww,Wh*Ww,nH
        relative_position_bias = relative_position_bias.permute(2, 0, 1).contiguous()  # nH, Wh*Ww, Wh*Ww
        attn = attn + relative_position_bias.unsqueeze(0)

        if mask is not None:
            nW = mask.shape[0]
            attn = attn.view(B_ // nW, nW, self.num_heads, N, N) + mask.unsqueeze(1).unsqueeze(0)
            attn = attn.view(-1, self.num_heads, N, N)
            attn = self.softmax(attn)
        else:
            attn = self.softmax(attn)

        attn = self.attn_drop(attn)

        x = (attn @ v).transpose(1, 2).reshape(B_, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

    def extra_repr(self) -> str:
        return f'dim={self.dim}, window_size={self.window_size}, num_heads={self.num_heads}'

    def flops(self, N):
        # calculate flops for 1 window with token length of N
        flops = 0
        # qkv = self.qkv(x)
        flops += N * self.dim * 3 * self.dim
        # attn = (q @ k.transpose(-2, -1))
        flops += self.num_heads * N * (self.dim // self.num_heads) * N
        #  x = (attn @ v)
        flops += self.num_heads * N * N * (self.dim // self.num_heads)
        # x = self.proj(x)
        flops += N * self.dim * self.dim
        return flops

In [8]:
class SwinTransformerBlock(nn.Module):
    r""" Swin Transformer Block.
    Args:
        dim (int): Number of input channels.
        input_resolution (tuple[int]): Input resulotion.
        num_heads (int): Number of attention heads.
        window_size (int): Window size.
        shift_size (int): Shift size for SW-MSA.
        mlp_ratio (float): Ratio of mlp hidden dim to embedding dim.
        qkv_bias (bool, optional): If True, add a learnable bias to query, key, value. Default: True
        qk_scale (float | None, optional): Override default qk scale of head_dim ** -0.5 if set.
        drop (float, optional): Dropout rate. Default: 0.0
        attn_drop (float, optional): Attention dropout rate. Default: 0.0
        drop_path (float, optional): Stochastic depth rate. Default: 0.0
        act_layer (nn.Module, optional): Activation layer. Default: nn.GELU
        norm_layer (nn.Module, optional): Normalization layer.  Default: nn.LayerNorm
    """

    def __init__(self, dim, input_resolution, num_heads, window_size=7, shift_size=0,
                 mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0., drop_path=0.,
                 act_layer=nn.GELU, norm_layer=nn.LayerNorm):
        super().__init__()
        self.dim = dim
        self.input_resolution = input_resolution
        self.num_heads = num_heads
        self.window_size = window_size
        self.shift_size = shift_size
        self.mlp_ratio = mlp_ratio
        if min(self.input_resolution) <= self.window_size:
            # if window size is larger than input resolution, we don't partition windows
            self.shift_size = 0
            self.window_size = min(self.input_resolution)
        assert 0 <= self.shift_size < self.window_size, "shift_size must in 0-window_size"

        self.norm1 = norm_layer(dim)
        self.attn = WindowAttention(
            dim, window_size=to_2tuple(self.window_size), num_heads=num_heads,
            qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop)

        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop)

        if self.shift_size > 0:
            # calculate attention mask for SW-MSA
            H, W = self.input_resolution
            img_mask = torch.zeros((1, H, W, 1))  # 1 H W 1
            h_slices = (slice(0, -self.window_size),
                        slice(-self.window_size, -self.shift_size),
                        slice(-self.shift_size, None))
            w_slices = (slice(0, -self.window_size),
                        slice(-self.window_size, -self.shift_size),
                        slice(-self.shift_size, None))
            cnt = 0
            for h in h_slices:
                for w in w_slices:
                    img_mask[:, h, w, :] = cnt
                    cnt += 1

            mask_windows = window_partition(img_mask, self.window_size)  # nW, window_size, window_size, 1
            mask_windows = mask_windows.view(-1, self.window_size * self.window_size)
            # print("555555----------", mask_windows.shape)
            attn_mask = mask_windows.unsqueeze(1) - mask_windows.unsqueeze(2)
            attn_mask = attn_mask.masked_fill(attn_mask != 0, float(-100.0)).masked_fill(attn_mask == 0, float(0.0))
        else:
            attn_mask = None

        self.register_buffer("attn_mask", attn_mask)

    def forward(self, x):
        H, W = self.input_resolution
        B, L, C = x.shape
        assert L == H * W, "input feature has wrong size"

        shortcut = x
        x = self.norm1(x)
        x = x.view(B, H, W, C)
        # print("22222---------", x.shape)

        # cyclic shift
        if self.shift_size > 0:
            shifted_x = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(1, 2))
        else:
            shifted_x = x

        # partition windows
        x_windows = window_partition(shifted_x, self.window_size)  # nW*B, window_size, window_size, C
        x_windows = x_windows.view(-1, self.window_size * self.window_size, C)  # nW*B, window_size*window_size, C

        # W-MSA/SW-MSA
        attn_windows = self.attn(x_windows, mask=self.attn_mask)  # nW*B, window_size*window_size, C

        # merge windows
        attn_windows = attn_windows.view(-1, self.window_size, self.window_size, C)
        shifted_x = window_reverse(attn_windows, self.window_size, H, W)  # B H' W' C

        # reverse cyclic shift
        if self.shift_size > 0:
            x = torch.roll(shifted_x, shifts=(self.shift_size, self.shift_size), dims=(1, 2))
        else:
            x = shifted_x
        x = x.view(B, H * W, C)

        # FFN
        x = shortcut + self.drop_path(x)
        x = x + self.drop_path(self.mlp(self.norm2(x)))

        return x

    def extra_repr(self) -> str:
        return f"dim={self.dim}, input_resolution={self.input_resolution}, num_heads={self.num_heads}, " \
               f"window_size={self.window_size}, shift_size={self.shift_size}, mlp_ratio={self.mlp_ratio}"

    def flops(self):
        flops = 0
        H, W = self.input_resolution
        # norm1
        flops += self.dim * H * W
        # W-MSA/SW-MSA
        nW = H * W / self.window_size / self.window_size
        flops += nW * self.attn.flops(self.window_size * self.window_size)
        # mlp
        flops += 2 * H * W * self.dim * self.dim * self.mlp_ratio
        # norm2
        flops += self.dim * H * W
        return flops

In [9]:
class PatchMerging(nn.Module):
    r""" Patch Merging Layer.
    Args:
        input_resolution (tuple[int]): Resolution of input feature.
        dim (int): Number of input channels.
        norm_layer (nn.Module, optional): Normalization layer.  Default: nn.LayerNorm
    """

    def __init__(self, input_resolution, dim, norm_layer=nn.LayerNorm):
        super().__init__()
        self.input_resolution = input_resolution
        self.dim = dim
        self.reduction = nn.Linear(4 * dim, 2 * dim, bias=False)
        self.norm = norm_layer(4 * dim)

    def forward(self, x):
        """
        x: B, H*W, C
        """
        H, W = self.input_resolution
        B, L, C = x.shape
        assert L == H * W, "input feature has wrong size"
        assert H % 2 == 0 and W % 2 == 0, f"x size ({H}*{W}) are not even."

        x = x.view(B, H, W, C)

        x0 = x[:, 0::2, 0::2, :]  # B H/2 W/2 C
        x1 = x[:, 1::2, 0::2, :]  # B H/2 W/2 C
        x2 = x[:, 0::2, 1::2, :]  # B H/2 W/2 C
        x3 = x[:, 1::2, 1::2, :]  # B H/2 W/2 C
        x = torch.cat([x0, x1, x2, x3], -1)  # B H/2 W/2 4*C
        x = x.view(B, -1, 4 * C)  # B H/2*W/2 4*C

        x = self.norm(x)
        x = self.reduction(x)

        return x

    def extra_repr(self) -> str:
        return f"input_resolution={self.input_resolution}, dim={self.dim}"

    def flops(self):
        H, W = self.input_resolution
        flops = H * W * self.dim
        flops += (H // 2) * (W // 2) * 4 * self.dim * 2 * self.dim
        return flops


In [10]:
class BasicLayer(nn.Module):
    """ A basic Swin Transformer layer for one stage.
    Args:
        dim (int): Number of input channels.
        input_resolution (tuple[int]): Input resolution.
        depth (int): Number of blocks.
        num_heads (int): Number of attention heads.
        window_size (int): Local window size.
        mlp_ratio (float): Ratio of mlp hidden dim to embedding dim.
        qkv_bias (bool, optional): If True, add a learnable bias to query, key, value. Default: True
        qk_scale (float | None, optional): Override default qk scale of head_dim ** -0.5 if set.
        drop (float, optional): Dropout rate. Default: 0.0
        attn_drop (float, optional): Attention dropout rate. Default: 0.0
        drop_path (float | tuple[float], optional): Stochastic depth rate. Default: 0.0
        norm_layer (nn.Module, optional): Normalization layer. Default: nn.LayerNorm
        downsample (nn.Module | None, optional): Downsample layer at the end of the layer. Default: None
        use_checkpoint (bool): Whether to use checkpointing to save memory. Default: False.
    """

    def __init__(self, dim, input_resolution, depth, num_heads, window_size,
                 mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0.,
                 drop_path=0., norm_layer=nn.LayerNorm, downsample=None, use_checkpoint=False):

        super().__init__()
        self.dim = dim
        self.input_resolution = input_resolution
        self.depth = depth
        self.use_checkpoint = use_checkpoint

        # build blocks
        self.blocks = nn.ModuleList([
            SwinTransformerBlock(dim=dim, input_resolution=input_resolution,
                                 num_heads=num_heads, window_size=window_size,
                                 shift_size=0 if (i % 2 == 0) else window_size // 2,
                                 mlp_ratio=mlp_ratio,
                                 qkv_bias=qkv_bias, qk_scale=qk_scale,
                                 drop=drop, attn_drop=attn_drop,
                                 drop_path=drop_path[i] if isinstance(drop_path, list) else drop_path,
                                 norm_layer=norm_layer)
            for i in range(depth)])

        # patch merging layer
        if downsample is not None:
            self.downsample = downsample(input_resolution, dim=dim, norm_layer=norm_layer)
        else:
            self.downsample = None

    def forward(self, x):
        for blk in self.blocks:
            if self.use_checkpoint:
                x = checkpoint.checkpoint(blk, x)
            else:
                x = blk(x)
        if self.downsample is not None:
            x = self.downsample(x)
        return x

    def extra_repr(self) -> str:
        return f"dim={self.dim}, input_resolution={self.input_resolution}, depth={self.depth}"

    def flops(self):
        flops = 0
        for blk in self.blocks:
            flops += blk.flops()
        if self.downsample is not None:
            flops += self.downsample.flops()
        return flops

In [11]:
class PatchEmbed(nn.Module):
    r""" Image to Patch Embedding
    Args:
        img_size (int): Image size.  Default: 224.
        patch_size (int): Patch token size. Default: 4.
        in_chans (int): Number of input image channels. Default: 3.
        embed_dim (int): Number of linear projection output channels. Default: 96.
        norm_layer (nn.Module, optional): Normalization layer. Default: None
    """

    def __init__(self, img_size=224, patch_size=4, in_chans=3, embed_dim=96, norm_layer=None):
        super().__init__()
        img_size = to_2tuple(img_size)
        patch_size = to_2tuple(patch_size)
        patches_resolution = [img_size[0] // patch_size[0], img_size[1] // patch_size[1]]
        self.img_size = img_size
        self.patch_size = patch_size
        self.patches_resolution = patches_resolution
        self.num_patches = patches_resolution[0] * patches_resolution[1]

        self.in_chans = in_chans
        self.embed_dim = embed_dim

        self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size)
        if norm_layer is not None:
            self.norm = norm_layer(embed_dim)
        else:
            self.norm = None

    def forward(self, x):
        B, C, H, W = x.shape
        # FIXME look at relaxing size constraints
        assert H == self.img_size[0] and W == self.img_size[1], \
            f"Input image size ({H}*{W}) doesn't match model ({self.img_size[0]}*{self.img_size[1]})."
        x = self.proj(x).flatten(2).transpose(1, 2)  # B Ph*Pw C
        #print("11111-----------",x.shape)
        if self.norm is not None:
            x = self.norm(x)
        return x

    def flops(self):
        Ho, Wo = self.patches_resolution
        flops = Ho * Wo * self.embed_dim * self.in_chans * (self.patch_size[0] * self.patch_size[1])
        if self.norm is not None:
            flops += Ho * Wo * self.embed_dim
        return flops

In [12]:
class SwinTransformer(nn.Module):
    r""" Swin Transformer
        A PyTorch impl of : `Swin Transformer: Hierarchical Vision Transformer using Shifted Windows`  -
          https://arxiv.org/pdf/2103.14030
    Args:
        img_size (int | tuple(int)): Input image size. Default 224
        patch_size (int | tuple(int)): Patch size. Default: 4
        in_chans (int): Number of input image channels. Default: 3
        num_classes (int): Number of classes for classification head. Default: 1000
        embed_dim (int): Patch embedding dimension. Default: 96
        depths (tuple(int)): Depth of each Swin Transformer layer.
        num_heads (tuple(int)): Number of attention heads in different layers.
        window_size (int): Window size. Default: 7
        mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. Default: 4
        qkv_bias (bool): If True, add a learnable bias to query, key, value. Default: True
        qk_scale (float): Override default qk scale of head_dim ** -0.5 if set. Default: None
        drop_rate (float): Dropout rate. Default: 0
        attn_drop_rate (float): Attention dropout rate. Default: 0
        drop_path_rate (float): Stochastic depth rate. Default: 0.1
        norm_layer (nn.Module): Normalization layer. Default: nn.LayerNorm.
        ape (bool): If True, add absolute position embedding to the patch embedding. Default: False
        patch_norm (bool): If True, add normalization after patch embedding. Default: True
        use_checkpoint (bool): Whether to use checkpointing to save memory. Default: False
    """

    def __init__(self, img_size=64, patch_size=4, in_chans=1, num_classes=27,
                 embed_dim=28, depths=[2, 2, 6, 2], num_heads=[3, 6, 12, 24],
                 window_size=8, mlp_ratio=4., qkv_bias=True, qk_scale=None,
                 drop_rate=0., attn_drop_rate=0., drop_path_rate=0.1,
                 norm_layer=nn.LayerNorm, ape=False, patch_norm=True,
                 use_checkpoint=False, **kwargs):
        super().__init__()

        self.num_classes = num_classes
        self.num_layers = len(depths)
        self.embed_dim = embed_dim
        self.ape = ape
        self.patch_norm = patch_norm
        self.num_features = int(embed_dim * 2 ** (self.num_layers - 1))
        self.mlp_ratio = mlp_ratio

        # split image into non-overlapping patches
        self.patch_embed = PatchEmbed(
            img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim,
            norm_layer=norm_layer if self.patch_norm else None)
        num_patches = self.patch_embed.num_patches
        patches_resolution = self.patch_embed.patches_resolution
        self.patches_resolution = patches_resolution

        # absolute position embedding
        if self.ape:
            self.absolute_pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dim))
            trunc_normal_(self.absolute_pos_embed, std=.02)

        self.pos_drop = nn.Dropout(p=drop_rate)

        # stochastic depth
        dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))]  # stochastic depth decay rule

        # build layers
        self.layers = nn.ModuleList()
        for i_layer in range(self.num_layers):
            layer = BasicLayer(dim=int(embed_dim * 2 ** i_layer),
                               input_resolution=(patches_resolution[0] // (2 ** i_layer),
                                                 patches_resolution[1] // (2 ** i_layer)),
                               depth=depths[i_layer],
                               num_heads=num_heads[i_layer],
                               window_size=window_size,
                               mlp_ratio=self.mlp_ratio,
                               qkv_bias=qkv_bias, qk_scale=qk_scale,
                               drop=drop_rate, attn_drop=attn_drop_rate,
                               drop_path=dpr[sum(depths[:i_layer]):sum(depths[:i_layer + 1])],
                               norm_layer=norm_layer,
                               downsample=PatchMerging if (i_layer < self.num_layers - 1) else None,
                               use_checkpoint=use_checkpoint)
            self.layers.append(layer)

        self.norm = norm_layer(self.num_features)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity()

        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            trunc_normal_(m.weight, std=.02)
            if isinstance(m, nn.Linear) and m.bias is not None:
                nn.init.constant_(m.bias, 0)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    @torch.jit.ignore
    def no_weight_decay(self):
        return {'absolute_pos_embed'}

    @torch.jit.ignore
    def no_weight_decay_keywords(self):
        return {'relative_position_bias_table'}

    def forward_features(self, x):
        x = self.patch_embed(x)
        if self.ape:
            x = x + self.absolute_pos_embed
        x = self.pos_drop(x)
        for layer in self.layers:
            #print("adsf")
            x = layer(x)
        x = self.norm(x)  # B L C
        x = self.avgpool(x.transpose(1, 2))  # B C 1
        x = torch.flatten(x, 1)
        return x

    def forward(self, x):
        x = self.forward_features(x)
        x = self.head(x)
        return x

    def flops(self):
        flops = 0
        flops += self.patch_embed.flops()
        for i, layer in enumerate(self.layers):
            flops += layer.flops()
        flops += self.num_features * self.patches_resolution[0] * self.patches_resolution[1] // (2 ** self.num_layers)
        flops += self.num_features * self.num_classes
        return flops

In [13]:
model = SwinTransformer(img_size=64,
                                patch_size=4,
                                in_chans=1,
                                num_classes=150,
                                embed_dim=96,
                                depths=[2, 2, 6, 2],
                                num_heads=[ 3, 6, 12, 24 ],
                                window_size=4,
                                mlp_ratio=4.,
                                qkv_bias=True,
                                qk_scale=True,
                                drop_rate=0.0,
                                drop_path_rate=0.2,
                                ape=False,
                                patch_norm=True,
                                use_checkpoint=False)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


In [14]:
idx2large = EasyDict({
    '0':'구이',
    '1':'국',
    '2':'기타',
    '3':'김치',
    '4':'나물',
    '5':'떡',
    '6':'만두',
    '7':'면',
    '8':'무침',
    '9':'밥',
    '10':'볶음',
    '11':'쌈',
    '12':'음청류',
    '13':'장',
    '14':'장아찌',
    '15':'적',
    '16':'전',
    '17':'전골',
    '18':'조림',
    '19':'죽',
    '20':'찌개',
    '21':'찜',
    '22':'탕',
    '23':'튀김',
    '24':'한과',
    '25':'해물',
    '26':'회'
})

In [15]:
large2idx = {large[-1]:idx for idx, large in enumerate(idx2large.items())}

In [16]:
class2idx = {'구이/갈비구이': 0, '구이/갈치구이': 1, '구이/고등어구이': 2, '구이/곱창구이': 3, '구이/닭갈비': 4, '구이/더덕구이': 5, '구이/떡갈비': 6, '구이/불고기': 7, '구이/삼겹살': 8, '구이/장어구이': 9, '구이/조개구이': 10, '구이/조기구이': 11, '구이/황태구이': 12, '구이/훈제오리': 13, '국/계란국': 14, '국/떡국_만두국': 15, '국/무국': 16, '국/미역국': 17, '국/북엇국': 18, '국/시래기국': 19, '국/육개장': 20, '국/콩나물국': 21, '기타/과메기': 22, '기타/양념치킨': 23, '기타/젓갈': 24, '기타/콩자반': 25, '기타/편육': 26, '기타/피자': 27, '기타/후라이드치킨': 28, '김치/갓김치': 29, '김치/깍두기': 30, '김치/나박김치': 31, '김치/무생채': 32, '김치/배추김치': 33, '김치/백김치': 34, '김치/부추김치': 35, '김치/열무김치': 36, '김치/오이소박이': 37, '김치/총각김치': 38, '김치/파김치': 39, '나물/가지볶음': 40, '나물/고사리나물': 41, '나물/미역줄기볶음': 42, '나물/숙주나물': 43, '나물/시금치나물': 44, '나물/애호박볶음': 45, '떡/경단': 46, '떡/꿀떡': 47, '떡/송편': 48, '만두/만두': 49, '면/라면': 50, '면/막국수': 51, '면/물냉면': 52, '면/비빔냉면': 53, '면/수제비': 54, '면/열무국수': 55, '면/잔치국수': 56, '면/짜장면': 57, '면/짬뽕': 58, '면/쫄면': 59, '면/칼국수': 60, '면/콩국수': 61, '무침/꽈리고추무침': 62, '무침/도라지무침': 63, '무침/도토리묵': 64, '무침/잡채': 65, '무침/콩나물무침': 66, '무침/홍어무침': 67, '무침/회무침': 68, '밥/김밥': 69, '밥/김치볶음밥': 70, '밥/누룽지': 71, '밥/비빔밥': 72, '밥/새우볶음밥': 73, '밥/알밥': 74, '밥/유부초밥': 75, '밥/잡곡밥': 76, '밥/주먹밥': 77, '볶음/감자채볶음': 78, '볶음/건새우볶음': 79, '볶음/고추장진미채볶음': 80, '볶음/두부김치': 81, '볶음/떡볶이': 82, '볶음/라볶이': 83, '볶음/멸치볶음': 84, '볶음/소세지볶음': 85, '볶음/어묵볶음': 86, '볶음/오징어채볶음': 87, '볶음/제육볶음': 88, '볶음/주꾸미볶음': 89, '쌈/보쌈': 90, '음청류/수정과': 91, '음청류/식혜': 92, '장/간장게장': 93, '장/양념게장': 94, '장아찌/깻잎장아찌': 95, '적/떡꼬치': 96, '전/감자전': 97, '전/계란말이': 98, '전/계란후라이': 99, '전/김치전': 100, '전/동그랑땡': 101, '전/생선전': 102, '전/파전': 103, '전/호박전': 104, '전골/곱창전골': 105, '조림/갈치조림': 106, '조림/감자조림': 107, '조림/고등어조림': 108, '조림/꽁치조림': 109, '조림/두부조림': 110, '조림/땅콩조림': 111, '조림/메추리알장조림': 112, '조림/연근조림': 113, '조림/우엉조림': 114, '조림/장조림': 115, '조림/코다리조림': 116, '죽/전복죽': 117, '죽/호박죽': 118, '찌개/김치찌개': 119, '찌개/닭계장': 120, '찌개/동태찌개': 121, '찌개/된장찌개': 122, '찌개/순두부찌개': 123, '찜/갈비찜': 124, '찜/계란찜': 125, '찜/김치찜': 126, '찜/꼬막찜': 127, '찜/닭볶음탕': 128, '찜/수육': 129, '찜/순대': 130, '찜/족발': 131, '찜/찜닭': 132, '찜/해물찜': 133, '탕/갈비탕': 134, '탕/감자탕': 135, '탕/곰탕_설렁탕': 136, '탕/매운탕': 137, '탕/삼계탕': 138, '탕/추어탕': 139, '튀김/고추튀김': 140, '튀김/새우튀김': 141, '튀김/오징어튀김': 142, '한과/약과': 143, '한과/약식': 144, '한과/한과': 145, '해물/멍게': 146, '해물/산낙지': 147, '회/물회': 148, '회/육회': 149}
idx2class = {v:k.split('/')[-1] for k,v in class2idx.items()}

In [17]:
def convert_to_large_id(s_id) :
  '''
  input : small_id
  output : large_id
  '''
  if 0<=s_id and s_id<=13 :
    return 0
  elif 14<=s_id and s_id<=21 : 
    return 1
  elif 22<=s_id and s_id<=28 :
    return 2
  elif 29<=s_id and s_id<=39 :
    return 3
  elif 40<=s_id and s_id<=45 :
    return 4
  elif 46<=s_id and s_id<=48 :
    return 5
  elif s_id == 49 : 
    return 6
  elif 50<=s_id and s_id<=61 :
    return 7
  elif 62<=s_id and s_id<=68 :
    return 8
  elif 69<=s_id and s_id<=77 :
    return 9
  elif 78<=s_id and s_id<=89 :
    return 10
  elif s_id==90 :
    return 11
  elif 91<=s_id and s_id<=92 : 
    return 12
  elif 93<=s_id and s_id<=94 :
    return 13
  elif s_id==95 :
    return 14
  elif s_id==96 :
    return 15
  elif 97<=s_id and s_id<=104 :
    return 16
  elif s_id==105 :
    return 17
  elif 106<=s_id and s_id<=116 :
    return 18
  elif 117<=s_id and s_id<=118 :
    return 19
  elif 119<=s_id and s_id<=123 :
    return 20
  elif 124<=s_id and s_id<=133 :
    return 21
  elif 134<=s_id and s_id<=139 :
    return 22
  elif 140<=s_id and s_id<=142 :
    return 23
  elif 143<=s_id and s_id<=145 :
    return 24
  elif 146<=s_id and s_id<=147 :
    return 25
  else :
    return 26
  return o

In [18]:
correct_id = dict()
l = [0,14,22,29,40,46,49,50,62,69,78,90,91,93,95,96,97,105,106,117,119,124,134,140,143,146,148]
for idx, i in enumerate(large2idx) :
  correct_id[i] = l[idx]

In [19]:
class Food_DataSet(torch.utils.data.Dataset) : 
    def __init__(self, args, is_validated) :
        self.args = args
        if is_validated == True:
          self.args.mode = 'val'
          data_path = args.val_path
        else :
          data_path = args.data_path

        with open(data_path, 'rb') as f:
          self.data = pickle.load(f)
        
    def __len__(self) : 
        return len(self.data)
        
    def __getitem__(self, i) :
        return self.data[i]

In [20]:
class INFERENCE_Dataset(Dataset):
    def __init__(self, args, df):
        self.args = args
        self.data = []

        x = df
        del df
        x = x.values.reshape(-1,1,64,64)

        for i in range(len(x)):
            self.data.append(x[i])
            if(i % 1000 == 0):
                print(i)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        return self.data[i]

In [21]:
class Food_DataLoader(object) :
    def __init__(self, args, is_validated=False, df=None):
        super().__init__()

        if df is None:
            dataset = Food_DataSet(args, is_validated)
        else:
            dataset = INFERENCE_Dataset(args, df)

        if args.mode == 'train':
            self.data_loader = torch.utils.data.DataLoader(dataset, batch_size=args.batch_size, shuffle=True)
        else:
            self.data_loader = torch.utils.data.DataLoader(dataset, batch_size=args.batch_size)
        
        self.data_iter = self.data_loader.__iter__()
    
    def next_batch(self):
        try:
            batch = self.data_iter.__next__()
        except StopIteration:
            self.data_iter = self.data_loader.__iter__()
            batch = self.data_iter.__next__()
        
        return batch

In [22]:
class Trainer():
  def __init__(self, args):
    self.args = args
    
    if self.args.mode != 'inference':
        self.loader = Food_DataLoader(args, is_validated=False)
    self.model = model
    model.load_state_dict(torch.load('swin\\checkpoint_6.pt'))
    if self.args.device == 'cuda':
        self.model.cuda()

    self.criterion = nn.CrossEntropyLoss()
    self.optim = torch.optim.Adam(self.model.parameters())
      
  def train(self):
    # train
    total_step = len(self.loader.data_loader)
    val_loader = Food_DataLoader(self.args, is_validated=True)
    opt_epoch = 1
    min_val_loss = 1e9

    for epoch in range(6, self.args.epoch):
      self.model.train()
      total_loss = 0

      for i in range(len(self.loader.data_loader)):
          feature, label = self.loader.next_batch()
          feature = torch.tensor(feature).to(device=self.args.device, dtype=torch.float)
          label = torch.tensor(label).to(device=self.args.device)

          pred = self.model(feature)
          loss = self.criterion(pred, label)
          
          self.optim.zero_grad()
          loss.backward()
          self.optim.step()

          total_loss += loss.item()
          if (i + 1) % 1000 == 0:
              print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, self.args.epoch, i + 1, total_step, loss.item()))

      #if (epoch+1)%50 == 0:
      val_loss = 0
      val_loss = self.validate(val_loader)
      print('Validation Loss: {:.20f}'.format(val_loss))
      if val_loss < min_val_loss:
        min_val_loss = val_loss
        opt_epoch = epoch+1
        torch.save(self.model.state_dict(), 'swin\\checkpoint_final.pt')
      torch.save(self.model.state_dict(), 'swin\\checkpoint_'+str(epoch+1)+'.pt')

    print("Best Epoch : {}".format(opt_epoch))

  def validate(self, val_loader):
      total_loss = 0
      total_step = len(val_loader.data_loader)

      self.model.eval()
      with torch.no_grad():
          for i in range(len(val_loader.data_loader)):
              feature, label = val_loader.next_batch()
              feature = torch.tensor(feature).to(device=self.args.device, dtype=torch.float)
              label = torch.tensor(label).to(device=self.args.device)

              pred = self.model(feature)
              loss = self.criterion(pred, label)
              total_loss += loss.item()

              if (i + 1) % 100 == 0:
                  print('Validation Step [{}/{}], Loss: {:.4f}'.format(i + 1, total_step, loss.item()))

      return total_loss/total_step


  def test(self):
      # test
      if self.args.device == 'cuda':
          self.model.load_state_dict(torch.load(self.args.ckpt))
      else:
          self.model.load_state_dict(torch.load(self.args.ckpt, map_location=torch.device('cpu')))
      
      self.model.eval()

      pred_list = list()
      label_list = list()

      with torch.no_grad():
          for i in range(len(self.loader.data_loader)):
              feature, label = self.loader.next_batch()
              feature = torch.tensor(feature).to(device=self.args.device, dtype=torch.float)
              label = torch.tensor(label).to(device=self.args.device)

              pred = self.model(feature)
              pred = F.softmax(pred, dim=1)
              pred = torch.argmax(pred, dim=1)
              
              pred_list.extend(pred.tolist())
              label_list.extend(label.tolist())

      acc = accuracy_score(label_list, pred_list)
      print(acc)
  
  def inference(self, df):
      # inference
      if self.args.device == 'cuda':
          self.model.load_state_dict(torch.load(self.args.ckpt))
      else:
          self.model.load_state_dict(torch.load(self.args.ckpt, map_location=torch.device('cpu')))
      self.model.eval()
      
      loader = Food_DataLoader(self.args, df=df)

      pred_list = list()
      label_list = list()

      with torch.no_grad():
          for i in range(len(loader.data_loader)):
              feature = loader.next_batch()
              feature = torch.tensor(feature).to(device=self.args.device, dtype=torch.float)

              pred = self.model(feature)
              pred = F.softmax(pred, dim=1)
              pred = torch.argmax(pred, dim=1)
              
              pred_list.extend(pred.tolist())

      return pred_list

In [23]:
def get_args():
    args = EasyDict({
        "epoch":100,
        "batch_size":10,
        "mode":'train',
        "ckpt":1,
        "device":'cuda',
        "n_class":150,
        "data_path":'train.pkl',
        "val_path":'val.pkl'
    })
    return args
def get_args2() :
  args = EasyDict({
      "mode":'test',
      "batch_size":1,
      "ckpt":'swin\\checkpoint_final.pt',
      "device":'cuda',
      "n_class":150,
  })
  return args

In [None]:
args = get_args()
trainer = Trainer(args)

if args.mode == 'train':
    trainer.train()
else:
    trainer.test()