Resnet으로 컬러 데이터셋에 적용하기 

- 데이터셋이 복잡해질수록 깊게 쌓아 올린 인공 신경망이 더 유리할 수 있다. 
- 직관적으로 생각할 때, 디테일이 많은 데이터는 더욱 많은 분석을 거쳐야 한다. 
- ResNet(residual network)는 CNN을 응용한 모델이다. 

  신경망을 깊게 쌓으면 성능이 나빠지는 문제를 해결하는 방법을 제시했고, DenseNet등의 파생 모델에 영향을 주었다. 
  
  ResNet은 컨볼루션 층의 출력에 전의 전 계층에 쓰였던 입력을 더합으로 특징이 유실되지 않도록 한다. 

In [2]:
#라이브러리 로드 
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, datasets

#쿠다 사용환경 설정
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

In [5]:
# ResNet
EPOCHS = 200
BATCH_SIZE = 128

#컬러 데이터셋은 흑백 이미지보다 복잡하여 더 많은 학습 필요

In [3]:
#CIFAR-10데이터셋 이용 (32*32 크기)
#컬러 이미지는 RGB의 3채널(RGB)의 픽셀값으로 구성되어 있다. 
#이미지 한개의 입력크기는 32*32*3 =3,072 ( PNG 형식의미지는 투명도까지 4종류의 채널을 가진다. )
#데이터 로드 

train_loader = torch.utils.data.DataLoader(datasets.CIFAR10('./.data',
                                                                train = True,
                                                                download = True, 
                                                                #과적합 방지를 위한 RandomCrop, HorizontalFlip 노이즈 추가 
                                                                transform = transforms.Compose([transforms.RandomCrop(32, padding = 4),
                                                                                                transforms.RandomHorizontalFlip(),
                                                                                                transforms.ToTensor(),
                                                                                               transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
                                                                                               ]))
                                          , batch_size = BATCH_SIZE, shuffle = True)

test_loader = torch.utils.data.DataLoader(datasets.CIFAR10('./.data',
                                                                train = False,
                                                                download = True, 
                                                                transform = transforms.Compose([transforms.ToTensor(),
                                                                                               transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
                                                                                               ]))
                                          , batch_size = BATCH_SIZE, shuffle = True)



Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./.data\cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Extracting ./.data\cifar-10-python.tar.gz to ./.data
Files already downloaded and verified


- 무작정 인공 신경망을 여러개 겹친다고 성능이 무한히 좋아지는 것은 아니다. 
- 여러 단계의 신경망을 거치며 최초 입력 이미지에 대한 정보가 소실되기 때문이다. 
- ResNet의 핵심은 네트워크를 작은 블록인 Residual 블록으로 나누고, 블록의 출력에 입력이었던 x를 더함으로 

  모델을 훨씬 깊게 설게 가능 ( 입,출력 관계를 바로 학습하기 보다 입력과 출력의 차이를 따로 학습하는게 성능이 좋다는 가설 )
- 신경망 깊을 수록 보통 신호의 강도가 감소하는데, ResNet은 입력 데이터를 몇 계층씩 건너뛰어 출력에 더함으로 이현상이 완화된다. 
- ResNet은 층이 많아 복잡한 것처럼 보이지만, Redsidual블록을 반복적으로 쌓은 것이다. 
- Residual 블록을 토치의 BasicBlock으로 정의해서 이용
- 토치에서는 nn.Module을 이용해 모듈위에 또 다른 모듈을 쌓아 올릴 수 있다. 

In [27]:
#self.make_layer()함수를 통해 하나의 모듈로 객체화되어 ResNet모델의 주요 층을 이룬다. 
#self.make_layer()는 시퀀셜 도구로 여러 BasicBlock을 모듈 하나로 묶어주는 역할 ( 반환한 객체는 컨볼루션 계층과 마찬가지로 모듈로 취급 )
class BasicBlock(nn.Module):
    def __init__(self, in_planes, planes, stride = 1):
        super(BasicBlock, self).__init__()
        self.conv1= nn.Conv2d(in_planes, planes, kernel_size = 3, stride = stride, padding = 1, bias = False)
        #배치 정규화 수행 ( 학습률이 높아 기울기가 소실, 발산 예방하기 위해 입력을 평균과 분산으로 정규화 )
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size = 3, stride = 1, padding = 1, bias = False)
        self.bn2 = nn.BatchNorm2d(planes)
        #in_planes를 받아 bn2의 계층의 출력의 크기와 같은 planes와 더해주는 모듈 정의 
        self.shortcut = nn.Sequential() # 시퀀셜은 여러 모듈을 하나로 묶는 역할 
        if stride != 1 or in_planes != planes :
            self.shortcut = nn.Sequential( nn.Conv2d(in_planes, planes, kernel_size = 1, stride = stride, bias = False), nn.BatchNorm2d(planes))
            
    #데이터의 흐름
    #입력 x가 컨볼루션, 배치정규화, 활성화 함수를 거치고, 다시 x를 shortcut에 거치게 하여 크기를 같게 하고 활성화 함수를 거친 값에 더한다. 
    #그리고 Relu활성화 함수를 통과 시켜 최종 출력을 만든다. 
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out
    
#모델 정의 
#이미지를 받아 컨볼루션과 배치정규화 층을 거친 후 여러 BasicBlock층을 통과하고 평균 풀링과 신경망을 거쳐 예측 출력 
class ResNet(nn.Module):
    def __init__(self, num_classes = 10):
        super(ResNet, self).__init__()
        self.in_planes = 16
        
        self.conv1 = nn.Conv2d(3, 16, kernel_size = 3, stride = 1, padding = 1, bias = False)
        self.bn1= nn.BatchNorm2d(16)
        #layer2,3 은 채널을 증폭하는 역할을 하는 모듈이다. shortcut모듈을 따로 갖게 되며, 이미지 맥락 보존 역할
        self.layer1= self._make_layer(16, 2, stride = 1)
        self.layer2 = self._make_layer(32, 2, stride = 2)
        self.layer3 = self._make_layer(64, 2, stride = 2)
        self.linear = nn.Linear(64, num_classes)
    
    def _make_layer(self, planes, num_blocks, stride):
        strides = [stride] +[1]*(num_blocks -1)
        layers = []
        for stride in strides :
            layers.append(BasicBlock(self.in_planes, planes, stride))
            self.in_planes = planes
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = F.avg_pool2d(out, 8) #평균 풀링
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out

In [28]:
#이번 예제는 학습률 감소 기법 이용(최적화 함수의 학습률을 점점 낮춰 정교하게 최적화 )
#optim.lr_scheduler.StepLR 이용하면 된다. 50epoch마다 학습률에 0.01곱

model = ResNet().to(DEVICE)
optimizer = optim.SGD(model.parameters(), lr = 0.1, momentum = 0.9, weight_decay = 0.0005)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 50, gamma = 0.1)

In [11]:
#모델의 모습
print(model)

ResNet(
  (conv1): Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (shortcut): Sequential()
    )
    (1): BasicBlock(
      (conv1): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=

In [18]:
#모델 train모드, F.corss_entropy오차 함수를 이용하여 모델 출력과 정답인 타겟값 사이 오차 계산
#역전파 알고리즘을 실행해주는 loss.backward()함수를 이용해 기울기 계산 후, optimizer.step() 최적화 함수로 구한 기울기값으로 학습 파라미터 갱신

def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
         #epoch마다 새로운 경사값을 계산하므로 zero_grad()함수를 호출해 경사를 0으로 설정
        optimizer.zero_grad()
        output = model(data)
        loss = F.cross_entropy(output ,target)
        loss.backward()
        optimizer.step()
        

In [19]:
#성능 확인

def evaluate(model, test_loader):
    model.eval()
    #초기값 설정
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)
            
            #배치 오차 합산
            test_loss += F.cross_entropy(output, target, reduction='sum').item()
            
            #가장 높은 값을 가진 인덱스가 예측값
            pred = output.max(1, keepdim = True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()
            
    test_loss /= len(test_loader.dataset)
    test_accuracy = 100* correct / len(test_loader.dataset)
    return test_loss, test_accuracy    

In [30]:
#학습
for epoch in range(1, EPOCHS + 1):
    scheduler.step()# 학습률 감소 이용하기 위해 추가
    train(model, train_loader, optimizer, epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)
    
    print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(
          epoch, test_loss, test_accuracy))

# 분석환경이 GPU가 아닌 CPU로 속도가 매우 느려 진행 중단

[1] Test Loss: 0.9930, Accuracy: 64.44%
[2] Test Loss: 0.9355, Accuracy: 66.46%
[3] Test Loss: 0.8987, Accuracy: 70.31%
[4] Test Loss: 0.8550, Accuracy: 71.83%
[5] Test Loss: 0.8605, Accuracy: 71.72%
[6] Test Loss: 0.8614, Accuracy: 71.68%
[7] Test Loss: 0.8095, Accuracy: 73.33%
[8] Test Loss: 0.9344, Accuracy: 70.83%
[9] Test Loss: 0.8688, Accuracy: 71.57%
[10] Test Loss: 0.8338, Accuracy: 72.24%
[11] Test Loss: 0.9612, Accuracy: 68.79%
[12] Test Loss: 0.7017, Accuracy: 77.35%
[13] Test Loss: 0.7565, Accuracy: 74.43%
[14] Test Loss: 0.7917, Accuracy: 74.28%
[15] Test Loss: 0.7566, Accuracy: 75.49%
[16] Test Loss: 0.8557, Accuracy: 72.45%
[17] Test Loss: 0.6882, Accuracy: 77.43%
[18] Test Loss: 0.9411, Accuracy: 70.62%
[19] Test Loss: 1.0378, Accuracy: 66.03%
[20] Test Loss: 0.9185, Accuracy: 70.74%
[21] Test Loss: 0.7676, Accuracy: 76.06%
[22] Test Loss: 0.5675, Accuracy: 80.71%
[23] Test Loss: 0.7182, Accuracy: 75.61%
[24] Test Loss: 0.8948, Accuracy: 71.01%
[25] Test Loss: 0.6141, A

KeyboardInterrupt: 