In [1]:
#step 1 import
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.datasets as dsets # 데이터셋
import torchvision.transforms as transforms

from torch.utils.data import Dataset, DataLoader # customData, DataLoader
import numpy as np

In [2]:
# step2) device
device = "cuda" if torch.cuda.is_available() else "cpu"
torch.manual_seed(777)
if device == "cuda":
    torch.cuda.manual_seed_all(777)
print(device)

# step3) hyper-parameter
learning_rate = 0.001
batch_size = 64
training_epoch = 100

train_trans = transforms.Compose(
    [
     transforms.RandomHorizontalFlip(),
     transforms.RandomResizedCrop(112),
     transforms.ToTensor(),
     transforms.Normalize((0.1307), (0.2890))
    ]
)

test_trans = transforms.Compose(
    [
     transforms.CenterCrop(112),
     transforms.ToTensor(),
     transforms.Normalize((0.1307), (0.2890))
    ]
)

# step4) Dataset & DataLoader 
mnist_train = dsets.MNIST(root = "MNIST_data", train = True, download= True, transform = train_trans)
mnist_test = dsets.MNIST(root = "MNIST_data", train = False, download= True, transform = test_trans)

train_loader = DataLoader(mnist_train, batch_size = batch_size, shuffle= True, drop_last = True)
test_loader = DataLoader(mnist_test, batch_size = batch_size, shuffle = False, drop_last = False)



cuda
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to MNIST_data/MNIST/raw/train-images-idx3-ubyte.gz


  0%|          | 0/9912422 [00:00<?, ?it/s]

Extracting MNIST_data/MNIST/raw/train-images-idx3-ubyte.gz to MNIST_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to MNIST_data/MNIST/raw/train-labels-idx1-ubyte.gz


  0%|          | 0/28881 [00:00<?, ?it/s]

Extracting MNIST_data/MNIST/raw/train-labels-idx1-ubyte.gz to MNIST_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to MNIST_data/MNIST/raw/t10k-images-idx3-ubyte.gz


  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting MNIST_data/MNIST/raw/t10k-images-idx3-ubyte.gz to MNIST_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to MNIST_data/MNIST/raw/t10k-labels-idx1-ubyte.gz


  0%|          | 0/4542 [00:00<?, ?it/s]

Extracting MNIST_data/MNIST/raw/t10k-labels-idx1-ubyte.gz to MNIST_data/MNIST/raw



  return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)


In [3]:
#https://github.com/TreB1eN/InsightFace_Pytorch/blob/master/model.py
#https://norman3.github.io/papers/docs/arcface.html

from torch.nn import Linear, Conv2d, BatchNorm1d, BatchNorm2d, PReLU, ReLU, Sigmoid, Dropout2d, Dropout, AvgPool2d, MaxPool2d, AdaptiveAvgPool2d, Sequential, Module, Parameter
import math

#https://pytorch.org/docs/stable/generated/torch.norm.html
def l2_norm(input, axis = 1):
  norm = torch.norm(input,2, axis, True) # 벡터의 길이 반환, 2의 의미는 Frobenius norm
  output = torch.div(input, norm)
  return output


class ArcFace(nn.Module):
  def __init__(self,embedding_size = 512, classnum = 10, s = 64, m = 0.5):
    super(ArcFace, self).__init__()
    self.classnum = classnum
    self.kernel = Parameter(torch.Tensor(embedding_size, classnum)) # 512*classnum 생성
    self.kernel.data.uniform_(-1, 1).renorm_(2,1,1e-5).mul_(1e5) #kernel init

    self.m = m # m = 0.5라디안을 의미 -> 28.64788975654116도
    self.s = s 
    self.cos_m = math.cos(m) 
    self.sin_m = math.sin(m)
    self.mm = self.sin_m * m
    self.threshold = math.cos(math.pi - m) # 180 - 28.64 = 약 151도 의미.

  def forward(self, embeddings, label): # embedding은 backbone 통과시키고, label은 class 의미함.
    nB = len(embeddings) #batch size의미
    kernel_norm = l2_norm(self.kernel, axis = 0) # class num에 해당하는 weight의 길이가 1로 맞춰짐 --> 방향벡터가 되었다고 생각하면 됌. print(torch.norm(kernel_norm, p =2, dim = 0))로 확인가능
    
    embeddings = l2_norm(embeddings) # 내용추가
    # cos, sin 계산하기
    cos_theta = torch.mm(embeddings, kernel_norm) # Batch_size * class_num
    cos_theta = cos_theta.clamp(-1,1) #  # for numerical stability
    cos_theta2 = torch.pow(cos_theta, 2)
    sin_theta2 = 1 - cos_theta2
    sin_theta = torch.sqrt(sin_theta2)

    #cos(theta + m) -> 삼각함수 덧셈공식 이용하기
    cos_theta_m = (cos_theta * self.cos_m - sin_theta * self.sin_m)
    
    # 0 <= theta +m <= pi 안에 있도록 보장하기
    # -m <= theta <= pi -m, 이때 pi-m 은 위에서 구해진 threshold이다.
    # 따라서, cos(theta) >= cos(theta+m)이다. as 정의역이 [0,pi]인 구간에서 cos함수는 감소함수.
    
    cond_v = cos_theta - self.threshold
    cond_mask = cond_v <= 0 # masking이 필요한 값들 모아 놓기 
    keep_val = (cos_theta - self.mm) # cosface를 사용하는 부분인데, 이 부분이 잘 이해가 안됌.
    cos_theta_m[cond_mask] = keep_val[cond_mask]
    output = cos_theta * 1.0 # a little bit hacky way to prevent in_place operation on cos_theta
    idx_ = torch.arange(0, nB, dtype=torch.long) # index 의미
    output[idx_,label] = cos_theta_m[idx_,label] # label에 되는 값만 수정해주는 듯하다.
    output *= self.s
    return output


In [4]:
#https://github.com/pytorch/vision/blob/main/torchvision/models/mobilenetv2.py

import torch
from torch import nn
from torch import Tensor
#from .._internally_replaced_utils import load_state_dict_from_url
from typing import Callable, Any, Optional, List

# mobilenet version2
def _make_divisible(v: float, divisor: int, min_value: Optional[int] = None) -> int:
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class ConvBNActivation(nn.Sequential):
    def __init__(
        self,
        in_planes: int,
        out_planes: int,
        kernel_size: int = 3,
        stride: int = 1,
        groups: int = 1,
        norm_layer: Optional[Callable[..., nn.Module]] = None,
        activation_layer: Optional[Callable[..., nn.Module]] = None,
        dilation: int = 1,
    ) -> None:
        padding = (kernel_size - 1) // 2 * dilation
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if activation_layer is None:
            activation_layer = nn.ReLU6
        super().__init__(
            
            nn.Conv2d(in_planes, out_planes, kernel_size, stride, padding, dilation=dilation, groups=groups,
                      bias=False),
            norm_layer(out_planes),
            activation_layer(inplace=True)
        )
        self.out_channels = out_planes

ConvBNReLU = ConvBNActivation

class InvertedResidual(nn.Module):
    def __init__(
        self,
        inp: int,
        oup: int,
        stride: int,
        expand_ratio: int,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        super(InvertedResidual, self).__init__()
        self.stride = stride
        assert stride in [1, 2]

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        hidden_dim = int(round(inp * expand_ratio))
        self.use_res_connect = self.stride == 1 and inp == oup

        layers: List[nn.Module] = []
        if expand_ratio != 1: #아 첫번째 bottle_neck에서 1*1 conv는 필요가 없어서 이렇게 해준거구나.
            # pw
            layers.append(ConvBNReLU(inp, hidden_dim, kernel_size=1, norm_layer=norm_layer))

        layers.extend([
            # dw
            ConvBNReLU(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim, norm_layer=norm_layer),
            # pw-linear
            nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
            norm_layer(oup),
        ])
        self.conv = nn.Sequential(*layers)
        self.out_channels = oup
        self._is_cn = stride > 1

    def forward(self, x: Tensor) -> Tensor:
        if self.use_res_connect:
            return x + self.conv(x)
        else:
            return self.conv(x)


class MobileNetV2(nn.Module):
    def __init__(
        self,
        num_classes: int = 10, # 이 부분도 살짝수정
        width_mult: float = 1.0,
        inverted_residual_setting: Optional[List[List[int]]] = None,
        round_nearest: int = 8,
        block: Optional[Callable[..., nn.Module]] = None,
        norm_layer: Optional[Callable[..., nn.Module]] = None
    ) -> None:
        """
        MobileNet V2 main class
        Args:
            num_classes (int): Number of classes
            width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
            inverted_residual_setting: Network structure
            round_nearest (int): Round the number of channels in each layer to be a multiple of this number
            Set to 1 to turn off rounding
            block: Module specifying inverted residual building block for mobilenet
            norm_layer: Module specifying the normalization layer to use
        """
        super(MobileNetV2, self).__init__()

        if block is None:
            block = InvertedResidual

        if norm_layer is None:
            norm_layer = nn.BatchNorm2d

        input_channel = 32
        last_channel = 1280 # 약간의 수정

        if inverted_residual_setting is None:
            inverted_residual_setting = [
                # t, c, n, s
                [1, 16, 1, 1],
                [6, 24, 2, 2],
                [6, 32, 3, 2],
                [6, 64, 4, 2],
                [6, 96, 3, 1],
                [6, 160, 3, 2],
                [6, 320, 1, 1],
            ]

        # only check the first element, assuming user knows t,c,n,s are required
        if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
            raise ValueError("inverted_residual_setting should be non-empty "
                             "or a 4-element list, got {}".format(inverted_residual_setting))

        # building first layer
        input_channel = _make_divisible(input_channel * width_mult, round_nearest)
        self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
        features: List[nn.Module] = [ConvBNReLU(1, input_channel, stride=2, norm_layer=norm_layer)] # 이부분 살짝 수정
        
        # building inverted residual blocks
        for t, c, n, s in inverted_residual_setting:
            output_channel = _make_divisible(c * width_mult, round_nearest)
            for i in range(n):
                stride = s if i == 0 else 1
                features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer))
                input_channel = output_channel
        # building last several layers
        features.append(ConvBNReLU(input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer))
        
        # make it nn.Sequential
        self.features = nn.Sequential(*features)

        # building classifier -> Arc Face랑 맞춰주기
        self.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(self.last_channel, 512), # 수정함
            nn.BatchNorm1d(512), # 수정함.
        )

        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out')
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def _forward_impl(self, x: Tensor) -> Tensor:
        # This exists since TorchScript doesn't support inheritance, so the superclass method
        # (this one) needs to have a name other than `forward` that can be accessed in a subclass
        x = self.features(x)
        # Cannot use "squeeze" as batch-size can be 1
        x = nn.functional.adaptive_avg_pool2d(x, (1, 1))
        x = torch.flatten(x, 1)
        x = self.classifier(x) 
        return x

    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)

In [5]:
# model은 mobile net
model = MobileNetV2().to(device)
Arc = ArcFace(classnum = 10).to(device)

# step6) loss & optim
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr = learning_rate, weight_decay = 5e-4)
lr_sche = optim.lr_scheduler.StepLR(optimizer, 10, gamma= 0.9)


In [11]:
from torchsummary import summary
summary(model, (1,112,112))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 32, 56, 56]             288
       BatchNorm2d-2           [-1, 32, 56, 56]              64
             ReLU6-3           [-1, 32, 56, 56]               0
            Conv2d-4           [-1, 32, 56, 56]             288
       BatchNorm2d-5           [-1, 32, 56, 56]              64
             ReLU6-6           [-1, 32, 56, 56]               0
            Conv2d-7           [-1, 16, 56, 56]             512
       BatchNorm2d-8           [-1, 16, 56, 56]              32
  InvertedResidual-9           [-1, 16, 56, 56]               0
           Conv2d-10           [-1, 96, 56, 56]           1,536
      BatchNorm2d-11           [-1, 96, 56, 56]             192
            ReLU6-12           [-1, 96, 56, 56]               0
           Conv2d-13           [-1, 96, 28, 28]             864
      BatchNorm2d-14           [-1, 96,

In [None]:
train_loss = []
train_acc = []
test_loss = []
test_acc = []

In [None]:
# step7,8) train & test
iteration = len(train_loader)
for epoch in range(training_epoch):
    model.train()
    loss = 0
    correct = 0
    lr_sche.step()
    for sample in train_loader:
        optimizer.zero_grad()
        X_train, Y_train = sample
        X_train = X_train.to(device)
        Y_train = Y_train.to(device)
        
        # 모델을 두번돌림.
        hypothesis = model(X_train)
        hypothesis = Arc(hypothesis, Y_train)

        cost = criterion(hypothesis, Y_train)
        cost.backward()
        optimizer.step()
        
        #calculate
        loss += cost.item()
        correct += (torch.argmax(hypothesis, dim = 1) == Y_train).float().sum()
    
    loss /= iteration
    acc = correct / (batch_size*iteration)
    
    if (epoch+1) % 1 == 0:
        print("[Epoch {:04d}], Loss = {:.4f}, acc = {:.2f}".format(epoch+1, loss, acc*100))
        
    if (epoch+1) %1 == 0:
        with torch.no_grad():
            model.eval()
            t_loss = 0
            t_correct = 0
            for X_test, Y_test in test_loader:
                X_test = X_test.to(device)
                Y_test = Y_test.to(device)
                
                pred = model(X_test)
                pred = Arc(pred, Y_test)

                t_cost = criterion(pred, Y_test)
                
                #calculate
                t_loss += t_cost.item()
                t_correct += (torch.argmax(pred, dim = 1) == Y_test).float().sum()
                
            t_loss /= len(test_loader)
            t_acc = t_correct / (len(test_loader)*batch_size)
            print("[Test] Loss = {:.4f}, acc = {:.2f}".format(t_loss, t_acc))

    # Data 저장하기
    if epoch == 0:
      torch.save(model.state_dict(), "./weight/{}_loss({:.4f})_ACC({:.4f}).pth".format(epoch+1, t_loss, t_acc))
      best_loss = t_loss
    elif t_loss < best_loss:
      torch.save(model.state_dict(), "./weight/{}_loss({:.4f})_ACC({:.4f}).pth".format(epoch+1, t_loss, t_acc))
      best_loss = t_loss

    train_loss.append(loss)
    train_acc.append(acc)
    test_loss.append(t_loss)
    test_acc.append(t_acc)

result = pd.DataFrame(list(zip(train_loss,train_acc,test_loss,test_acc)), columns = ["train_loss", "train_acc","test_loss","test_acc"])
result.to_csv("Result1.csv", index = False)


KeyboardInterrupt: ignored

In [None]:
import pandas as pd

a = [1,2]
b = [3,4]
c = [5,6]
d = [7,8]

In [None]:
result.to_csv("Result1.csv")

tensor(10.0320, device='cuda:0')

In [None]:
Y_test

tensor([1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6], device='cuda:0')

DataSet이 잘 연결되는지 확인이 안되네..

In [None]:
import torch

t1, t2 = torch.utils.data.random_split(range(10), [3, 7], generator=torch.Generator().manual_seed(42))


In [None]:
t1.indices

[2, 6, 1]

In [None]:
for 