# PART 04. 작물 잎 사진으로 질병 분류하기

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
!pwd

/content


In [9]:
!unzip "/content/drive/MyDrive/dataset.zip" -d "/content/drive/MyDrive/dataset/"

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (618).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (790).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (392).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (486).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (1359).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (205).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (1018).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (148).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (107).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (459).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (589).JPG  
  inflating: /content/drive/MyDrive/dataset/Apple___healthy/image (734).JPG  
  inflating:

## 패키지 로드

In [10]:
import os
import shutil
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision import models
from torchvision.datasets import ImageFolder
import time
import copy

## 데이터 분할

* 데이터 분할을 위한 디렉토리 생성

In [11]:
original_dataset_dir = '/content/drive/MyDrive/dataset'
classes_list = os.listdir(original_dataset_dir)

base_dir = '/content/drive/MyDrive/splitted'
try:
    os.mkdir(base_dir)
except:
    pass

train_dir = os.path.join(base_dir, 'train')
validation_dir = os.path.join(base_dir, 'val')
test_dir = os.path.join(base_dir, 'test')
try:
    os.mkdir(train_dir)
    os.mkdir(validation_dir)
    os.mkdir(test_dir)
except:
    pass

for cls in classes_list:
    try:
        os.mkdir(os.path.join(train_dir, cls))
        os.mkdir(os.path.join(validation_dir, cls))
        os.mkdir(os.path.join(test_dir, cls))
    except:
        pass


* 데이터 분할과 클래스별 데이터 수 확인

In [12]:
for cls in classes_list:
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path)

    # train:validation:test = 6:2:2 split
    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)

    train_fnames = fnames[:train_size]
    print("Train size(",cls,"): ", len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, cls), fname)
        shutil.copyfile(src, dst)

    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size(",cls,"): ", len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)

    test_fnames = fnames[(train_size+validation_size):(validation_size + train_size +test_size)]

    print("Test size(",cls,"): ", len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)


Train size( Pepper,_bell___healthy ):  886
Validation size( Pepper,_bell___healthy ):  295
Test size( Pepper,_bell___healthy ):  295
Train size( Grape___Esca_(Black_Measles) ):  829
Validation size( Grape___Esca_(Black_Measles) ):  276
Test size( Grape___Esca_(Black_Measles) ):  276
Train size( Pepper,_bell___Bacterial_spot ):  598
Validation size( Pepper,_bell___Bacterial_spot ):  199
Test size( Pepper,_bell___Bacterial_spot ):  199
Train size( Strawberry___healthy ):  273
Validation size( Strawberry___healthy ):  91
Test size( Strawberry___healthy ):  91
Train size( Grape___Black_rot ):  708
Validation size( Grape___Black_rot ):  236
Test size( Grape___Black_rot ):  236
Train size( Corn___Common_rust ):  715
Validation size( Corn___Common_rust ):  238
Test size( Corn___Common_rust ):  238
Train size( Apple___Apple_scab ):  378
Validation size( Apple___Apple_scab ):  126
Test size( Apple___Apple_scab ):  126
Train size( Potato___healthy ):  91
Validation size( Potato___healthy ):  30


## 베이스라인 모델 학습

* 베이스라인 모델 학습을 위한 준비

In [13]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
BATCH_SIZE = 256
EPOCH = 15

In [14]:
transform_base = transforms.Compose([transforms.Resize((64,64)), transforms.ToTensor()])
train_dataset = ImageFolder(root='/content/drive/MyDrive/splitted/train', transform=transform_base)
val_dataset = ImageFolder(root='/content/drive/MyDrive/splitted/val', transform=transform_base)

In [15]:
#train_loader, val_loader 정의
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

* 베이스라인 모델 설계하기

In [16]:
class Net(nn.Module):

    #사전 모델 정의
    def __init__(self):

        super(Net, self).__init__()

        #nn.Conv2d(input channel수, output channel수, filter크기)
        #convolution층을 적층할 때, 연쇄적으로! (ex. conv1에서 output channel수가 32였으면 conv2의 input channel수도 32)
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)

        self.fc1 = nn.Linear(4096, 512)
        self.fc2 = nn.Linear(512, 33) #분류할 클래스 총 33개

    #중요부분 (실제로 어떻게 모델을 구성할지)
    def forward(self, x):

        x = self.conv1(x) # 앞서 정의한 conv1층 적용
        x = F.relu(x) # ReLU함수 적용
        x = self.pool(x) # MaxPooling 적용
        x = F.dropout(x, p = 0.25, training = self.training)

        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p = 0.25, training = self.training)

        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p = 0.25, training = self.training)


        x = x.view(-1,4096)
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

model_base = Net().to(DEVICE)
optimizer = optim.Adam(model_base.parameters(), lr=0.001)

* 모델 학습을 위한 함수

In [19]:
#중요 부분
def train(model, train_loader, optimizer):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)

        optimizer.zero_grad() # gradient를 0으로 초기화
        output = model(data)
        loss = F.cross_entropy(output, target) # cross-entropy loss 계산 (output은 모델에 들어가서 예측한 값, target은 실제 정답(label))
        loss.backward() # 오차 역전파 수행
        optimizer.step() # 최적화 (모델 파라미터 업데이트)



* 모델 평가를 위한 함수

In [20]:
def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0

    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            test_loss += F.cross_entropy(output,target, reduction='sum').item()


            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

* 모델 학습을 실행하기

In [21]:
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict())

    #중요 부분
    for epoch in range(1, num_epochs + 1):
        since = time.time()
        train(model, train_loader, optimizer)
        train_loss, train_acc = evaluate(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)

        if val_acc > best_acc:
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict()) #여기까지

        time_elapsed = time.time() - since
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    model.load_state_dict(best_model_wts)
    return model


base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)
torch.save(base,'baseline.pt')

-------------- epoch 1 ----------------
train Loss: 1.5140, Accuracy: 57.26%
val Loss: 1.5253, Accuracy: 56.82%
Completed in 2m 8s
-------------- epoch 2 ----------------
train Loss: 1.0299, Accuracy: 69.40%
val Loss: 1.0567, Accuracy: 67.72%
Completed in 2m 6s
-------------- epoch 3 ----------------
train Loss: 0.7872, Accuracy: 75.54%
val Loss: 0.8213, Accuracy: 74.45%
Completed in 1m 58s
-------------- epoch 4 ----------------
train Loss: 0.5535, Accuracy: 82.73%
val Loss: 0.5987, Accuracy: 80.62%
Completed in 1m 58s
-------------- epoch 5 ----------------
train Loss: 0.4775, Accuracy: 85.17%
val Loss: 0.5309, Accuracy: 83.20%
Completed in 1m 57s
-------------- epoch 6 ----------------
train Loss: 0.3957, Accuracy: 87.92%
val Loss: 0.4632, Accuracy: 85.03%
Completed in 1m 57s
-------------- epoch 7 ----------------
train Loss: 0.3428, Accuracy: 89.63%
val Loss: 0.4132, Accuracy: 86.72%
Completed in 1m 58s
-------------- epoch 8 ----------------
train Loss: 0.3159, Accuracy: 90.70%
v

## Transfer Learning 모델 학습

* Transfer Learning을 위한 준비

In [22]:
data_transforms = {
    'train': transforms.Compose([transforms.Resize([64,64]),
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),

    'val': transforms.Compose([transforms.Resize([64,64]),
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}

In [23]:
data_dir = '/content/drive/MyDrive/splitted'
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), transform=data_transforms[x]) for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=BATCH_SIZE, shuffle=True, num_workers=2) for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

* Pre-Trained Model 불러오기

In [24]:
#중요 부분(resnet pre-trained model을 앞서 이용하고 마지막 fc layer부분만 task에 맞게 33으로 바꾸기)

resnet = models.resnet18(pretrained=True) #resnet18 (pre-trained model) 불러와서 pretrained weight 가져오기
num_ftrs = resnet.fc.in_features
resnet.fc = nn.Linear(num_ftrs, 33) #resnet의 입력 특성수인 num_ftrs를 이 분류 task에 맞게 33개로 바꾸기 (fine tuning)
resnet = resnet.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 159MB/s]


* Pre-Trained Model의 일부 Layer Freeze하기

In [25]:
ct = 0
for child in resnet.children():
    ct += 1
    if ct < 6:
        for param in child.parameters():
            param.requires_grad = False #초반 5개의 층은 gradient update가 안되게 하여 가중치 고정시키기(freeze)

* Transfer Learning 모델 학습과 검증을 위한 함수

In [26]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('-------------- epoch {} ----------------'.format(epoch+1))
        since = time.time()
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0


            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(DEVICE)
                labels = labels.to(DEVICE)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss/dataset_sizes[phase]
            epoch_acc = running_corrects.double()/dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))


            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        time_elapsed = time.time() - since
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    model.load_state_dict(best_model_wts)

    return model

* 모델 학습을 실행하기

In [27]:
model_resnet = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH)

torch.save(model_resnet, 'resnet.pt')

-------------- epoch 1 ----------------
train Loss: 0.6127 Acc: 0.8110
val Loss: 0.3269 Acc: 0.8901
Completed in 1m 17s
-------------- epoch 2 ----------------
train Loss: 0.2408 Acc: 0.9197
val Loss: 0.3367 Acc: 0.8929
Completed in 1m 15s
-------------- epoch 3 ----------------
train Loss: 0.1689 Acc: 0.9435
val Loss: 0.1930 Acc: 0.9383
Completed in 1m 19s
-------------- epoch 4 ----------------
train Loss: 0.1473 Acc: 0.9517
val Loss: 0.2031 Acc: 0.9385
Completed in 1m 17s
-------------- epoch 5 ----------------
train Loss: 0.1227 Acc: 0.9593
val Loss: 0.1583 Acc: 0.9473
Completed in 1m 15s
-------------- epoch 6 ----------------
train Loss: 0.1247 Acc: 0.9585
val Loss: 0.1610 Acc: 0.9457
Completed in 1m 16s
-------------- epoch 7 ----------------
train Loss: 0.0964 Acc: 0.9670
val Loss: 0.1168 Acc: 0.9636
Completed in 1m 19s
-------------- epoch 8 ----------------
train Loss: 0.0536 Acc: 0.9826
val Loss: 0.0676 Acc: 0.9771
Completed in 1m 15s
-------------- epoch 9 ----------------


## 모델 평가

* 베이스라인 모델 평가를 위한 전처리하기

In [28]:
transform_base = transforms.Compose([transforms.Resize([64,64]),transforms.ToTensor()])
test_base = ImageFolder(root='/content/drive/MyDrive/splitted/test',transform=transform_base)
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

* Transfer Learning모델 평가를 위한 전처리하기

In [29]:
transform_resNet = transforms.Compose([
        transforms.Resize([64,64]),
        transforms.RandomCrop(52),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

test_resNet = ImageFolder(root='/content/drive/MyDrive/splitted/test', transform=transform_resNet)
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

* 베이스라인 모델 성능 평가하기

In [30]:
baseline=torch.load('baseline.pt')
baseline.eval()
test_loss, test_accuracy = evaluate(baseline, test_loader_base)

print('baseline test acc:  ', test_accuracy)

baseline test acc:   91.82626110902491


* Transfer Learning 모델 성능 평가하기

In [31]:
resnet=torch.load('resnet.pt')
resnet.eval()
test_loss, test_accuracy = evaluate(resnet, test_loader_resNet)

print('ResNet test acc:  ', test_accuracy)

ResNet test acc:   98.38527975966954
