# Import Modules

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder 
from torch.utils.data import DataLoader

import torch.nn as nn 
import torch.nn.functional as F
import torch.optim as optim

from torchsummary import summary 

In [2]:
import torch

if torch.cuda.is_available():
  DEVICE = torch.device('cuda')
else:
  DEVICE = torch.device('cpu')
print(DEVICE)

cuda


# Load Data

In [3]:
#배치 사이즈 및, 에폭 설정
BATCH_SIZE = 64
EPOCH = 30

In [4]:
#데이터를 dataloader에 담아주는 과정
transform_base = transforms.Compose([transforms.Resize((224,224)),transforms.ToTensor()]) 

train_dataset = ImageFolder(root='datasets/market1501/train', transform=transform_base) 
val_dataset = ImageFolder(root='datasets/market1501/val', transform=transform_base)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset,batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

# Model

In [5]:
class BottleNeck(nn.Module):
    expansion = 4 #블록의 마지막 레이어에서 출력 차원을 4배로 늘린다
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()

        #Resnet50은 1x1, 3x3, 1x1을 거치는 3개의 컨볼루션 레이어가 하나의 블록으로 구성된다.
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(),
            nn.Conv2d(out_channels, out_channels * BottleNeck.expansion, kernel_size=1, stride=1, bias=False),
            nn.BatchNorm2d(out_channels * BottleNeck.expansion),
        )

        self.shortcut = nn.Sequential() #x를 그대로 사용

        self.relu = nn.ReLU()

        #stride가 2 이상이어서 feature map의 크기가 다르거나, 출력층의 채널수가 다른 경우 1x1컨볼루션을 이용해 x의 크기를 맞춰준다.
        if stride != 1 or in_channels != out_channels * BottleNeck.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels*BottleNeck.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels*BottleNeck.expansion)
            )
            
    def forward(self, x):
        x = self.residual_function(x) + self.shortcut(x)
        x = self.relu(x)
        return x

class ResNet(nn.Module):
    def __init__(self, block, num_block, num_classes=4, init_weights=True):
        super().__init__()

        self.in_channels=64

        #7x7의 컨볼루션 레이어와 맥스풀링
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        #3개의 블록이 있으며 3x3 컨볼루션 레이어에서도 stride를 1로 두어 output size에 변화가 없다.
        self.conv2_x = self._make_layer(block, 64, num_block[0], 1)
        #conv3~5에서는 각각 4,6,3개의 블록이 있으며 3x3 컨볼루션 레이어에서 stride를 2로 두어 output size가 절반으로 줄어든다.
        self.conv3_x = self._make_layer(block, 128, num_block[1], 2)
        self.conv4_x = self._make_layer(block, 256, num_block[2], 2)
        self.conv5_x = self._make_layer(block, 512, num_block[3], 2)

        #Average Pooling을 통해 각 채널에서의 평균값 추출
        self.avg_pool = nn.AdaptiveAvgPool2d((1,1))
        #마지막 출력층의 채널 수인 512*4개가 input으로 들어가 클래수의 수만큼 출력한다.
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        #가중치 초기화
        if init_weights:
            self._initialize_weights()

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion

        return nn.Sequential(*layers)

    def forward(self,x):
        output = self.conv1(x)
        output = self.conv2_x(output)
        x = self.conv3_x(output)
        x = self.conv4_x(x)
        x = self.conv5_x(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

def resnet50():
    return ResNet(BottleNeck, [3,4,6,3])

# print model summary
model_base = resnet50().to(DEVICE)
summary(model_base, (3, 224, 224)) 

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 112, 112]           9,408
       BatchNorm2d-2         [-1, 64, 112, 112]             128
              ReLU-3         [-1, 64, 112, 112]               0
         MaxPool2d-4           [-1, 64, 56, 56]               0
            Conv2d-5           [-1, 64, 56, 56]           4,096
       BatchNorm2d-6           [-1, 64, 56, 56]             128
              ReLU-7           [-1, 64, 56, 56]               0
            Conv2d-8           [-1, 64, 56, 56]          36,864
       BatchNorm2d-9           [-1, 64, 56, 56]             128
             ReLU-10           [-1, 64, 56, 56]               0
           Conv2d-11          [-1, 256, 56, 56]          16,384
      BatchNorm2d-12          [-1, 256, 56, 56]             512
           Conv2d-13          [-1, 256, 56, 56]          16,384
      BatchNorm2d-14          [-1, 256,

In [6]:
#출력 사이즈 확인
x = torch.randn(50, 3, 224, 224).to(DEVICE)
output = model_base(x)
print(output.size())

torch.Size([50, 4])


In [7]:
# Optimizer, Loss function
optimizer = optim.Adam(model_base.parameters(), lr=0.001) 
criterion = nn.CrossEntropyLoss()

In [8]:
#train
def train(model, train_loader, optimizer):
    model.train()  # 모델 train 상태로
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)   # data, target 값 DEVICE에 할당
        optimizer.zero_grad()                               # optimizer gradient 값 초기화
        output = model(data)                                # 할당된 데이터로 output 계산
        loss = criterion(output, target)                    # Cross Entropy Loss 사용해 loss 계산
        loss.backward()                                     # 계산된 loss back propagation
        optimizer.step()                                    # parameter update

In [9]:
#evaluate
def evaluate(model, test_loader):
    model.eval()      # 모델 평가 상태로
    test_loss = 0     # test_loss 초기화
    correct = 0       # 맞게 예측한 개수. 0 값으로 초기화
    
    with torch.no_grad(): 
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)     # data, target DEVICE에 할당
            output = model(data)                                  # output 계산
            test_loss += criterion(output, target).item()         # loss 계산(총 loss 에 더해주기)
            pred = output.max(1, keepdim=True)[1]                 # 계산된 벡터값 중 가장 큰 값 가지는 class 예측
            correct += pred.eq(target.view_as(pred)).sum().item() # 맞게 예측한 값 세기
   
    test_loss /= len(test_loader.dataset)                         # 평균 loss
    test_accuracy = 100. * correct / len(test_loader.dataset)     # test(validation) 데이터 정확도
    return test_loss, test_accuracy  

## Train Model

In [10]:
import time
import copy
 
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  # best accuracy 초기화
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()                                     # 학습 시간 계산
        train(model, train_loader, optimizer)                   # train 데이터로 학습
        train_loss, train_acc = evaluate(model, train_loader)   # train_loss, train_acc 계산
        val_loss, val_acc = evaluate(model, val_loader)         # valid_loss, valid_acc 계산
        
        if val_acc>best_acc:  # update best accuracy
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since # 학습 시간 출력
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 

    model.load_state_dict(best_model_wts)  
    return model

base = train_baseline(model_base ,train_loader, val_loader, optimizer)  	# 모델 학습시키기
torch.save(base,'model/resnet.pt')      # 모델 저장

-------------- epoch 1 ----------------
train Loss: 0.0216, Accuracy: 60.56%
val Loss: 0.0518, Accuracy: 31.25%
Completed in 0m 58s
-------------- epoch 2 ----------------
train Loss: 0.0146, Accuracy: 67.20%
val Loss: 0.0288, Accuracy: 41.25%
Completed in 0m 58s
-------------- epoch 3 ----------------
train Loss: 0.0129, Accuracy: 67.10%
val Loss: 0.0243, Accuracy: 50.00%
Completed in 0m 58s
-------------- epoch 4 ----------------
train Loss: 0.0117, Accuracy: 69.69%
val Loss: 0.0261, Accuracy: 53.75%
Completed in 0m 58s
-------------- epoch 5 ----------------
train Loss: 0.0110, Accuracy: 70.63%
val Loss: 0.0281, Accuracy: 52.50%
Completed in 0m 58s
-------------- epoch 6 ----------------
train Loss: 0.0112, Accuracy: 71.18%
val Loss: 0.0327, Accuracy: 46.25%
Completed in 0m 58s
-------------- epoch 7 ----------------
train Loss: 0.0107, Accuracy: 71.00%
val Loss: 0.0173, Accuracy: 61.25%
Completed in 0m 58s
-------------- epoch 8 ----------------
train Loss: 0.0089, Accuracy: 77.06%

In [11]:
# test data
transform_base = transforms.Compose([transforms.Resize((224,224)),transforms.ToTensor()]) 
test_base = ImageFolder(root='datasets/market1501/test',transform=transform_base)  
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)

In [12]:
def predict(model, test_loader):
    model.eval() #eval 모드
    result = None #모델을 거친 확률값(7개 클래스에 대한 softmax)
    targets = None #실제 target
    with torch.no_grad(): 
        for data, target in test_loader:  
            data = data.to(DEVICE)
            #비어있을 경우 그대로 넣어주고
            if result is None:
              result = model(data).cpu().numpy()
              targets = target.cpu().numpy()
            #이미 값이 있을 경우 concat을 통해 추가해준다
            else:
              result = np.concatenate((result, model(data).cpu().numpy()))
              targets = np.concatenate((targets, target))
    return result, targets

## 평가

In [13]:
y_pred_base, y_true_base = predict(base, test_loader_base)

In [14]:
from sklearn.metrics import accuracy_score, confusion_matrix


print("base모델 정확도: {:.3f}".format(accuracy_score(y_pred_base.argmax(axis=1), y_true_base))) #y_pred_base는 확률값이므로 argmax를 통해 예측 클래스를 뽑는다
print("base모델 오차 행렬:\n", confusion_matrix(y_pred_base.argmax(axis=1), y_true_base))

base모델 정확도: 0.713
base모델 오차 행렬:
 [[ 8  1  0  0]
 [12 17  6  1]
 [ 0  1 14  1]
 [ 0  1  0 18]]


In [15]:
from sklearn.metrics import classification_report

print(classification_report(y_pred_base.argmax(axis=1), y_true_base))

              precision    recall  f1-score   support

           0       0.40      0.89      0.55         9
           1       0.85      0.47      0.61        36
           2       0.70      0.88      0.78        16
           3       0.90      0.95      0.92        19

    accuracy                           0.71        80
   macro avg       0.71      0.80      0.71        80
weighted avg       0.78      0.71      0.71        80

