In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!unzip -qq '/content/drive/MyDrive/제로베이스_코랩/dataset.zip' -d '/content/drive/MyDrive/제로베이스_코랩/dataset'
# -d : 압축파일을 푸는 경로를 지정하는 옵션

In [None]:
import os

original_dataset_dir = '/content/drive/MyDrive/제로베이스_코랩/dataset'
classes_list = os.listdir(original_dataset_dir)

base_dir = '/content/drive/MyDrive/제로베이스_코랩/splitted'
os.mkdir(base_dir)

데이터 정리를 위한 목록 및 폴더 생성
- 하기의 폴더들은 모두 base_dir의 splitted 폴더 안에 생성

In [None]:
import shutil

# train, validation, test의 directory 만들기
train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)

validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)

test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

# classes_list : 폴더의 목록이 있는 리스트
# for문을 통해 train, validation, test의 directory 안에 classes_list 동일 폴더 만들기
for cls in classes_list:
  os.mkdir(os.path.join(train_dir, cls))
  os.mkdir(os.path.join(validation_dir, cls))
  os.mkdir(os.path.join(test_dir, cls))

데이터 현황 확인

In [None]:
import math

# classes_list의 cls(폴더명)이 하나씩 돌면서 join된 original_dataset_dir인 dataset의 파일이 fnames에 하나씩 담긴다
# ex) dataset 중 classes_list의 cls인 Apple__Apple_scab가 path에 담기고
# path에 의해 Apple__Apple_scab의 파일인 image(107), image(108).. 등이 fnames에 담긴다
for cls in classes_list:
  path = os.path.join(original_dataset_dir, cls)
  fnames = os.listdir(path)

  # train, validation, test 데이터 나누기 (6:2:2)
  train_size = math.floor(len(fnames) * 0.6)
  validation_size = math.floor(len(fnames) * 0.2)
  test_size = math.floor(len(fnames) * 0.2)

  train_fnames = fnames[:train_size]
  print('Train size(",cls,"):', len(train_fnames))
  for fname in train_fnames:
    scr = os.path.join(path, fname)
    dst = os.path.join(os.path.join(train_dir, cls), fname)
    shutil.copyfile(scr, dst) # 복사를 위한 코드

  validation_fnames = fnames[train_size:(validation_size + train_size)]
  print('Validation size(",cls,"):', len(validation_fnames))
  for fname in validation_fnames:
    src = os.path.join(path, fname)
    dst = os.path.join(os.path.join(validation_dir, cls), fname)
    shutil.copyfile(scr, dst)

  test_fnames = fnames[(validation_size + train_size):(validation_size + train_size + test_size)]
  print('Test size(",cls,"):', len(test_fnames))
  for fname in test_fnames:
    src = os.path.join(path, fname)
    dst = os.path.join(os.path.join(test_dir, cls), fname)
    shutil.copyfile(scr, dst)



Train size(",cls,"): 886
Validation size(",cls,"): 295
Test size(",cls,"): 295
Train size(",cls,"): 829
Validation size(",cls,"): 276
Test size(",cls,"): 276
Train size(",cls,"): 598
Validation size(",cls,"): 199
Test size(",cls,"): 199
Train size(",cls,"): 273
Validation size(",cls,"): 91
Test size(",cls,"): 91
Train size(",cls,"): 708
Validation size(",cls,"): 236
Test size(",cls,"): 236
Train size(",cls,"): 715
Validation size(",cls,"): 238
Test size(",cls,"): 238
Train size(",cls,"): 378
Validation size(",cls,"): 126
Test size(",cls,"): 126
Train size(",cls,"): 91
Validation size(",cls,"): 30
Test size(",cls,"): 30
Train size(",cls,"): 600
Validation size(",cls,"): 200
Test size(",cls,"): 200
Train size(",cls,"): 512
Validation size(",cls,"): 170
Test size(",cls,"): 170
Train size(",cls,"): 1276
Validation size(",cls,"): 425
Test size(",cls,"): 425
Train size(",cls,"): 372
Validation size(",cls,"): 124
Test size(",cls,"): 124
Train size(",cls,"): 631
Validation size(",cls,"): 210
T

In [None]:
import torch
import os

USE_CODE = torch.cuda.is_available
DEVICE = torch.device('cuda' if USE_CODE else 'cpu')
BATCH_SIZE = 256
EPOCH = 30

ImageFolder : 폴더 이름을 라벨로 작업해주는 메써드

In [None]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder

transfrom_base = transforms.Compose([transforms.Resize((64, 64)), transforms.ToTensor()])
train_dataset = ImageFolder(root='/content/drive/MyDrive/제로베이스_코랩/splitted/train', transform = transfrom_base)
val_dataset = ImageFolder(root='/content/drive/MyDrive/제로베이스_코랩/splitted/val', transform = transfrom_base)

In [None]:
from torch.utils.data import DataLoader

train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size = BATCH_SIZE,
                                           shuffle = True,
                                           num_workers = 4)
val_loader = torch.utils.data.DataLoader(val_dataset,
                                          batch_size = BATCH_SIZE,
                                          shuffle = True,
                                          num_workers = 4)

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):

  def __init__(self):

    super(Net, self).__init__()

    self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
    self.pool = nn.MaxPool2d(2, 2)
    self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
    self.conv3 = nn.Conv2d(64, 64, 3, padding=1)

    self.fc1 = nn.Linear(4096, 512)
    self.fc2 = nn.Linear(512, 33)

  def forward(self, x):
    x = self.conv1(x)
    x = F.relu(x)
    x = self.pool(x)
    x = F.dropout(x, p = 0.25, training=self.training)
    # training=self.training : 학습할때만 학습율 사용, 테스트할 때는 미사용하겠다는 설정

    x = self.conv2(x)
    x = F.relu(x)
    x = self.pool(x)
    x = F.dropout(x, p = 0.25, training=self.training)

    x = self.conv3(x)
    x = F.relu(x)
    x = self.pool(x)
    x = F.dropout(x, p = 0.25, training=self.training)

    x = x.view(-1, 4096) # platten()과 같은 기능
    x = self.fc1(x)
    x = F.relu(x)
    x = F.dropout(x, p = 0.25, training=self.training)
    x = self.fc2(x)

    return F.log_softmax(x, dim=1)


In [None]:
model_base = Net().to(DEVICE)
optimizer = optim.Adam(model_base.parameters(), lr = 0.001)

In [None]:
def train(model, train_loader, opmtimizer):
  model.train()
  for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(DEVICE), target.to(DEVICE)
    optimizer.zero_grad()
    output = model(data)
    loss = F.cross_entropy(output, target)
    loss.backward()
    optimizer.step()

In [None]:
def evaluate(model, test_loader):
  model.eval()
  test_loss = 0
  correct = 0
  # with 구문은 끝나면 resorce가 저절로 닫힌다
  with torch.no_grad():
    for data, target in test_loader:
      data, target = data.to(DEVICE), target.to(DEVICE)
      output = model(data)

      test_loss += F.cross_entropy(output, target, reduction='sum').item()

      pred = output.max(1, keepdim = True)[1]
      correct += pred.eq(target.view_as(pred)).sum().item()

  test_loss /= len(test_loader.dataset)
  # /= : 왼쪽 변수에 오른쪽 값을 나누고 그 결과를 왼쪽 변수에 할당
  # ex) a /= b  -> a = a/b
  test_accuracy = 100. * correct / len(test_loader.dataset)
  return test_loss, test_accuracy

### state_dict
- PyTorch의 state_dict는 모델의 매개변수(parameters)와 옵티마이저(optimizer)의 상태(state)를 담고 있는 사전(dictionary) 객체
- state_dict는 모델을 저장하고 불러올 때 매우 유용한 포맷

In [None]:
import time
import copy

def train_baseline(model, train_loader, val_loader, optimizer, num_epochs = 30):
  best_acc = 0.0
  # wts = Weights
  best_model_wts = copy.deepcopy(model.state_dict())

  for epoch in range(1, num_epochs + 1):
    since = time.time()
    train(model, train_loader, optimizer)
    train_loss, train_acc = evaluate(model, train_loader)
    val_loss, val_acc = evaluate(model, val_loader)

    if val_acc > best_acc:
      best_acc = val_acc
      best_model_wts = copy.deepcopy(model.state_dict())

      time_elapsed = time.time() - since
      print('------------- epoch {} ---------------'.format(epoch))
      print('Train Loss : {:.4f}, Accuracy : {:.2f}%'.format(train_loss, train_acc))
      print('Val Loss : {:.4f}, Accuracy : {:.2f}%'.format(val_loss, val_acc))
      print('Completed in : {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

  model.load_state_dict(best_model_wts)
  return model

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)
torch.save(base, 'baseline.pt')


KeyboardInterrupt: 

### 전이학습 : 잘 학습된 모델의 가중치를 가져와서 원하는 방향으로 조금만 바꿔서 가중치를 학습시키는 것
- 입력단에 가까운 레이어일 수록 conventional layer, 출력단에 가까운 레이어일 수록 fully connected layer
- data_transforms : 이미지폴더의 함수에 옵션으로 지정
- Compose : 전처리 함수, 이미지 증강 설정을 한 번에 합치기
- 이미지 증강 : 심층 신경망에서 데이터셋을 사용할 경우 대규모 데이터셋이더라도 과적합이 발생할 수 있다. 이를 방지하고자 이미지 증강을 통해 모델의 의존도를 줄일 수 있고, 이에 따른 일반화 능력이 향상된다. 따라서 심층 신경망의 성공적인 학습을 위해 목적에 위배되지 않는다면(필수로 지켜야되는 조건들은 변경하지 않는다면) 동일한 이미지를 위치, 밝기, 색도와 같은 요소들을 무작위로 조절하면서 다양한 데이터를 만드는 것이다.
- https://jungnamgyu.tistory.com/32 참고

In [None]:
data_transforms = {
    'train' : transforms.Compose([transforms.Resize([64, 64]),
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),
        # RandomHorizontalFlip() : 이미지 뒤집기, 좌우반전
        # RandomVerticalFlip() : 이미지 뒤집기, 상하반전
        transforms.RandomCrop(52), transforms.ToTensor(),
        # RandomCrop : 이미지 자르기, 전체에서 일부를 보여주는 효과
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),
        # Normalize : RGB 색상값의 평균값과 표준편차 지정, 색상 변경은 학습능력이 좋아진다
    'val' : transforms.Compose([transforms.Resize([64, 64]),
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}

In [None]:
data_dir = '/content/drive/MyDrive/제로베이스_코랩/splitted'
image_datasets = {x : ImageFolder(root=os.path.join(data_dir, x),
                                  transform=data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x : torch.utils.data.DataLoader(image_datasets[x],
                                               batch_size = BATCH_SIZE,
                                               shuffle = True,
                                               num_workers = 4) for x in ['train', 'val']}
dataset_size = {x : len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

### resnet50 학습 후 출력되는 class 숫자와 현재 모델에서 예측해야되는 숫자가 다르므로 조정 필요

In [None]:
from torchvision import models
# torchvision의 models를 통해 여러 모듈을 import

resnet = models.resnet50(pretrained=True)
# pretrained = True 시 학습이 완료된 모듈을 가져오고, False 시 모듈의 구조만 가져오도록 설정하는 메써드
num_ftrs = resnet.fc.in_features
                    # in_features : 마지막 레이어의 채널 숫자를 의미
                    # 여기선 resnet50의 마지막 레이어의 채널 숫자를 의미
                    # 이를 num_ftrs 변수에 지정
resnet.fc = nn.Linear(num_ftrs, 33)
            # resnet50의 마지막 레이어 채널 숫자를 33개로 수정
resnet = resnet.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr = 0.001)

from torch.optim import lr_scheduler
# lr_scheduler : epoch 마다 learning_rate를 조정
# 여기선 7 epoch 마다 0.1씩 learning_rate를 감소
# 가중치 업데이트를 크게 가져가고 learning_rate를 감소키면서 점점 작게(=세밀하게) 가져간다

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
ct = 0
for child in resnet.children():
  # resnet는 원래 10개의 layer 존재
  ct += 1
  if ct < 6:
    for param in child.parameters():
      param.requires_grad = False
  # 여기선 입력에 가까운 0~5번까지 layer는 frezze(고정) 시키고(=기존에 학습이 잘되었다고 판단하여 학습시키지 않고)
  # 6번째 layer부터 학습을 진행시키고자 하는 설정

In [None]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):

  best_model_wts = copy.deepcopy(model.state_dict())
  best_acc = 0.0

  for epoch in range(num_epochs):
    print('------------- epoch{} -------------'.format(epoch + 1))
    since = time.time()
    for phase in ['train', 'val']:
      if phase == 'train':
        model.train()
      else:
        model.eval()

      running_loss = 0.0
      running_corrects = 0

      for inputs, labels in dataloaders[phase]:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        optimizer.zero_grad()

        with torch.set_grad_enabled(phase == 'train'):
          # torch.set_grad_enabled(phase == 'train') : trian 폴더의 데이터라면(=train loader라면) gradient를 학습하는 것을 허락한다, 웨이트가 업데이트 되는 것을 허락한다
          outputs = model(inputs)
          _, preds = torch.max(outputs, 1)
          # outputs은 one_hot_encoding 형태
          # torch.max() 함수는 첫 번째 반환 값으로 최대 값들을, 두 번째 반환 값으로 해당 최대 값들의 인덱스를 반환
          # _, preds 와 같이 2개의 인자 필요
          loss = criterion(outputs, labels)

          if phase == 'train' :
            loss.backward()
            optimizer.step()

        # train이 아니라면 with 구문은 미실행

          running_loss += loss.item() * inputs.size(0)
          running_corrects += torch.sum(preds == labels.data)

      if phase == 'train':
        scheduler.step()

      epoch_loss = running_loss / dataset_size[phase]
      epoch_acc = running_corrects.double() / dataset_size[phase]

      print('{} Loss : {:.4f} Acc : {:.4f}'.format(phase, epoch_loss, epoch_acc))

      if phase == 'val' and epoch_acc > best_acc:
        best_acc = epoch_acc
        best_model_wts = copy.deepcopy(model.state_dict())

    time_elapsed = time.time() - since
    print('Complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
  print('Best val Acc : {:.4f}'.format(best_acc))

  model.load_state_dict(best_model_wts)

  return model

In [None]:
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler,
                              num_epochs=EPOCH)
torch.save(model_resnet50, 'resnet50.pt')

------------- epoch1 -------------
train Loss : 0.6069 Acc : 0.8206
val Loss : 0.3421 Acc : 0.9007
Complete in 2m 6s
------------- epoch2 -------------
train Loss : 0.2201 Acc : 0.9298
val Loss : 0.1981 Acc : 0.9330
Complete in 1m 53s
------------- epoch3 -------------
train Loss : 0.1660 Acc : 0.9463
val Loss : 0.2001 Acc : 0.9365
Complete in 1m 52s
------------- epoch4 -------------
train Loss : 0.1311 Acc : 0.9574
val Loss : 0.1992 Acc : 0.9259
Complete in 1m 54s
------------- epoch5 -------------
train Loss : 0.1178 Acc : 0.9623
val Loss : 0.1032 Acc : 0.9568
Complete in 1m 51s
------------- epoch6 -------------
train Loss : 0.0998 Acc : 0.9678
val Loss : 0.1330 Acc : 0.9409
Complete in 1m 53s
------------- epoch7 -------------
train Loss : 0.0841 Acc : 0.9721
val Loss : 0.1886 Acc : 0.9403
Complete in 1m 52s
------------- epoch8 -------------
train Loss : 0.0454 Acc : 0.9856
val Loss : 0.0406 Acc : 0.9872
Complete in 1m 53s
------------- epoch9 -------------
train Loss : 0.0275 Ac

In [None]:
transform_resNet = transforms.Compose([
    transforms.Resize([64, 64]),
    transforms.RandomCrop(52),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.228, 0.224, 0.225])
])

test_resNet = ImageFolder(root='/content/drive/MyDrive/제로베이스_코랩/splitted/test', transform=transform_resNet)
test_loader_resNet = torch.utils.data.DataLoader(test_resNet,
                                                 batch_size=BATCH_SIZE,
                                                 shuffle=True,
                                                 num_workers=4)

In [None]:
resnet50 = torch.load('resnet50.pt')
resnet50.eval()
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)

print("ResNet test acc :", test_accuracy)

ResNet test acc : 99.29903617474027
