In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.models as models
from torchvision import transforms
import time
from tqdm.autonotebook import tqdm
from torch.utils.data import DataLoader
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import inspect
import torch.nn.functional as F

import matplotlib.pyplot as plt
import numpy as np

In [3]:
# 출처 : https://github.com/weiaicunzai/pytorch-cifar100/blob/master/models/resnext.py
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
cardinality = 32 # path를 결정하는 변환 그룹의 크기
depth = 4 # 각 그룹 당 가지고 있는 채널 수
base_width = 64 # 기초 채널 수

# 그림 3의 (c)의 grouped convolution 레이어는 입출력 채널이 4차원인 32개의 컨볼루션 그룹(=cardinality)을 형성한다.
# grouped convolution은 그 그룹을 레이어의 출력으로 concatenate한다.
class ResNextBottleNeck(nn.Module):
    # __init__는 클래스 내의 생성자라 불리고 초기화를 위한 함수이다.
    # self는 인스턴스 자신이다.
    # 인자는 in_channels, out_channels, stride를 받는다.
    def __init__(self, in_channels, out_channels, stride):
         # super(모델명, self).__init__() 형태로 호출
        super().__init__()

        groups = cardinality # 특징 맵(feature map)이 분할된 그룹 수

        num_depth = int(depth * out_channels / base_width) # 그룹당 채널 수(depth per group)
        self.split_transform = nn.Sequential(
            nn.Conv2d(in_channels, groups * num_depth, kernel_size=1, groups=groups, bias=False),
            nn.BatchNorm2d(groups * num_depth),
            nn.ReLU(),
            nn.Conv2d(groups * num_depth, groups * num_depth, kernel_size=3, stride=stride, groups=groups, padding=1, bias=False),
            nn.BatchNorm2d(groups * num_depth),
            nn.ReLU(),
            nn.Conv2d(groups * num_depth, out_channels * 4, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels * 4),
        )

        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels * 4:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * 4, stride=stride, kernel_size=1, bias=False),
                nn.BatchNorm2d(out_channels * 4)
            )

    def forward(self, x):
        return F.relu(self.split_transform(x) + self.shortcut(x))
class ResNext(nn.Module):

    def __init__(self, block, num_blocks, class_names=100):
        super().__init__()
        self.in_channels = 64

        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 64, 3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        self.conv2 = self._make_layer(block, num_blocks[0], 64, 1)
        self.conv3 = self._make_layer(block, num_blocks[1], 128, 2)
        self.conv4 = self._make_layer(block, num_blocks[2], 256, 2)
        self.conv5 = self._make_layer(block, num_blocks[3], 512, 2)
        self.avg = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.avg(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    # _make_layer에서 resnext block  생성
    # block: 블록 유형(default resnext bottleneck c)
    # num_block: 레이어당 블록수
    # out_channels: 블록당 출력 채널 수
    # stride: 블록 stride
    # return : resnext 레이어
    def _make_layer(self, block, num_block, out_channels, stride):
        strides = [stride] + [1] * (num_block - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * 4

        return nn.Sequential(*layers)
# Resnet과 형태는 똑같다.
def resnext50():
    return ResNext(ResNextBottleNeck, [3, 4, 6, 3])

def resnext101():
    return ResNext(ResNextBottleNeck, [3, 4, 23, 3])

def resnext152():
    return ResNext(ResNextBottleNeck, [3, 4, 36, 3])

In [4]:
def test():
    net = resnext101()
    x = torch.randn(3, 1, 224, 224)
    y = net(x).to('cuda')
    print(y.shape)

test()

torch.Size([3, 10])


In [5]:
my_resnet = resnext101()

In [6]:
input = torch.randn((4,1,224,224))
output = my_resnet(input)
print(output.shape)

print(my_resnet)

# 결과

torch.Size([4, 10])
ResNext(
  (conv1): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv2): Sequential(
    (0): ResNextBottleNeck(
      (split_transform): Sequential(
        (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), groups=32, bias=False)
        (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU()
        (3): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=32, bias=False)
        (4): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): ReLU()
        (6): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (shortcut): Sequential(
        (0): Conv2d(64, 256, kernel_s

In [7]:
if torch.cuda.is_available():
    device = torch.device("cuda") 
else:
    device = torch.device("cpu")

In [8]:
def get_data_loaders(train_batch_size, val_batch_size):
    fashion_mnist = torchvision.datasets.FashionMNIST(
        download=True, 
        train=True, 
        root=".").train_data.float()
    
    data_transform = transforms.Compose([ # Compose : transforms 리스트 구성
        transforms.Resize((224, 224)), # Resize : 입력 이미지의 크기를 지정된 크기로 조정
        transforms.ToTensor(), # ToTensor : PIL image or numpy.ndarray를 tensor로 바꿈
        transforms.Normalize((fashion_mnist.mean()/255,), (fashion_mnist.std()/255,))])

    train_loader = DataLoader(torchvision.datasets.FashionMNIST(
        download=True, # 인터넷으로부터 데이터 다운
        root=".", # data가 저장될 경로(path)
        transform=data_transform, # feature 및 label 변환(transformation) 지정
        train=True), # train set
        batch_size=train_batch_size, 
        shuffle=True)

    val_loader = DataLoader(torchvision.datasets.FashionMNIST(
        download=False, 
        root=".", 
        transform=data_transform, 
        train=False),
        batch_size=val_batch_size, 
        shuffle=False)

    return train_loader, val_loader

In [9]:
def calculate_metric(metric_fn, true_y, pred_y):
    if "average" in inspect.getfullargspec(metric_fn).kwonlyargs:
        # getfullargspec(func) : 호출 가능한 개체의 매개 변수의 이름과 기본값을 가져옴 (튜플로 반환)
        # kwonlyargs : 모든 parameter 값 확인
        return metric_fn(true_y, pred_y, average="macro")
        # macro : 평균의 평균을 내는 방법
        # micro : 개수 그자체로 평균을 내는 방법
    else:
        return metric_fn(true_y, pred_y)

# precision, recall, f1, accuracy를 한번에 보여주기 위한 함수
def print_scores(p, r, f1, a, batch_size):
    for name, scores in zip(("precision", "recall", "F1", "accuracy"), (p, r, f1, a)):
        print(f"\t{name.rjust(14, ' ')}: {sum(scores)/batch_size:.4f}")

In [10]:
# 모델 가져와 gpu에 할당
model = my_resnet.to(device)

# 에포크, 배치 크기 지정
epochs = 5
batch_size = 32

# 데이터로더(Dataloaders)
train_loader, val_loader = get_data_loaders(batch_size, batch_size)

# 손실함수 정의(loss function)
loss_function = nn.CrossEntropyLoss() 
# 크로스 엔트로피 : 실제 값과 예측 값의 차이를 줄이기 위한 엔트로피
# 다중 클래스 문제에서 잘 작동

# 옵티마이저 : Adam 
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4) 
# model(신경망) 파라미터를 optimizer에 전달해줄 때 nn.Module의 parameters() 메소드를 사용
# Karpathy's learning rate 사용 (3e-4)

start_ts = time.time() # 초단위 시간 반환

losses = []
batches = len(train_loader)
val_batches = len(val_loader)

# 에포크 : training + evaluation
for epoch in range(epochs):
    
    total_loss = 0

    # tqdm : 진행률 프로세스바
    progress = tqdm(enumerate(train_loader), desc="Loss: ", total=batches)

    # ----------------- TRAINING  -------------------- 
    # training 모델로 설정
    model.train()
    
    for i, data in progress:
        X, y = data[0].to(device), data[1].to(device)
        
        # 단일 배치마다 training 단계
        model.zero_grad() # 모든 모델의 파라미터 미분값을 0으로 초기화
        outputs = model(X)
        loss = loss_function(outputs, y)
        loss.backward()
        optimizer.step() # step() : 파라미터를 업데이트함

        # training data 가져오기
        current_loss = loss.item() # item() : 키, 값 반환
        total_loss += current_loss

        # set_description : 진행률 프로세스바 업데이트
        progress.set_description("Loss: {:.4f}".format(total_loss/(i+1)))
        
    # out of memory in GPU 뜰 때
    if torch.cuda.is_available():
        torch.cuda.empty_cache() # # GPU 캐시 데이터 삭제
    
    # ----------------- VALIDATION  ----------------- 
    val_losses = 0
    precision, recall, f1, accuracy = [], [], [], []
    
    # set model to evaluating (testing)
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(val_loader):
            X, y = data[0].to(device), data[1].to(device)

            outputs = model(X) # 네트워크로부터 예측값 가져오기

            val_losses += loss_function(outputs, y)

            predicted_classes = torch.max(outputs, 1)[1] # 네트워크의 예측값으로부터 class 값(범주) 가져오기
            
            # P/R/F1/A metrics for batch 계산
            for acc, metric in zip((precision, recall, f1, accuracy), 
                                   (precision_score, recall_score, f1_score, accuracy_score)):
                acc.append(
                    calculate_metric(metric, y.cpu(), predicted_classes.cpu())
                )
          
    print(f"Epoch {epoch+1}/{epochs}, training loss: {total_loss/batches}, validation loss: {val_losses/val_batches}")
    print_scores(precision, recall, f1, accuracy, val_batches)
    losses.append(total_loss/batches) # 학습률을 위한 작업
print(f"Training time: {time.time()-start_ts}s")




Loss:   0%|          | 0/1875 [00:00<?, ?it/s]

RuntimeError: CUDA out of memory. Tried to allocate 1.53 GiB (GPU 0; 8.00 GiB total capacity; 6.99 GiB already allocated; 0 bytes free; 7.00 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [12]:
from ptflops import get_model_complexity_info

with torch.cuda.device(0):
  net = my_resnet
  macs, params = get_model_complexity_info(net, (1, 224, 224), as_strings=True,
                                           print_per_layer_stat=True, verbose=True)
  print('{:<30}  {:<8}'.format('Computational complexity: ', macs))
  print('{:<30}  {:<8}'.format('Number of parameters: ', params))

ResNext(
  25.118 M, 100.000% Params, 70.307 GMac, 100.000% MACs, 
  (conv1): Sequential(
    0.001 M, 0.003% Params, 0.039 GMac, 0.055% MACs, 
    (0): Conv2d(0.001 M, 0.002% Params, 0.029 GMac, 0.041% MACs, 1, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(0.0 M, 0.001% Params, 0.006 GMac, 0.009% MACs, 64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(0.0 M, 0.000% Params, 0.003 GMac, 0.005% MACs, )
  )
  (conv2): Sequential(
    0.134 M, 0.535% Params, 6.782 GMac, 9.647% MACs, 
    (0): ResNextBottleNeck(
      0.056 M, 0.221% Params, 2.8 GMac, 3.983% MACs, 
      (split_transform): Sequential(
        0.039 M, 0.154% Params, 1.952 GMac, 2.777% MACs, 
        (0): Conv2d(0.0 M, 0.001% Params, 0.013 GMac, 0.018% MACs, 64, 128, kernel_size=(1, 1), stride=(1, 1), groups=32, bias=False)
        (1): BatchNorm2d(0.0 M, 0.001% Params, 0.013 GMac, 0.018% MACs, 128, eps=1e-05, momentum=0.1, affine=True, track_running_s