# 작물 잎 사진으로 질병 분류하기

## 작물 잎 사진의 종류와 질병 유무를 분류하는 모델 설계, 학습

## 1. 데이터 분할

### 데이터셋을 학습용, 검증용, 테스트용으로 분리

In [None]:
import os
import shutil # 파일과 파일 모음에 대한 여러 가지 고수준 연산을 제공(파일 복사, 삭제 등)

# 원래 데이터셋이 있는 경로
original_dataset_dir = '/Users/minguinho/Documents/AI_Datasets/crop_leaf_original_dataset'
classes_list = os.listdir(original_dataset_dir) # 원래 데이터셋에 클래스 별로 데이터가 저장되어 있다. 각 클래스별 폴더 경로를 classes_list에 저장한다 

# 분류할 데이터를 저장할 곳
base_dir = '/Users/minguinho/Documents/AI_Datasets/crops_leaf_dataset'
os.mkdir(base_dir)

# 학습, 검증, 테스트용 데이터 저장할 곳 생성
train_dir = os.path.join(base_dir, 'train')
os.mkdir(train_dir)

validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)

test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)


# 클래스별로 사진을 저장하기 위해 폴더 생성
for clss in classes_list :
    os.mkdir(os.path.join(train_dir, clss))
    os.mkdir(os.path.join(validation_dir, clss))
    os.mkdir(os.path.join(test_dir, clss))

In [None]:
import math

# 클래스별로 저장된 사진들을 학습, 검증, 테스트용 데이터셋에 저장한다
for clss in classes_list:
    path = os.path.join(original_dataset_dir, clss) # 원래 데이터셋/클래스이름
    fnames = os.listdir(path) # 클래스별로 저장되어있는 파일의 경로들을 여기에 저장한다. 예) 건강한 토마토 잎들의 경로들을 fnames에 저장 

    # math.floor() : 지정된 숫자보다 작거나 같은 가장 큰 정수를 반환
    train_size = math.floor(len(fnames) * 0.6)
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)

    # 파일을 학습, 검증, 테스트용으로 나눠서 저장
    train_fnames = fnames[:train_size]
    print('Train size(', clss, ') : ', len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(train_dir, clss), fname) # 데이터셋/클래스이름/파일명
        shutil.copyfile(src, dst) # 복사해서 저장

    validation_fnames = fnames[train_size:(train_size + validation_size)]
    print('Validation size(', clss, ') : ', len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, clss), fname)
        shutil.copyfile(src, dst) # 복사해서 저장

    test_fnames = fnames[validation_size:(train_size + validation_size + test_size)]
    print('Test size(', clss, ') : ', len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, clss), fname)
        shutil.copyfile(src, dst) # 복사해서 저장

# 데이터 전처리

In [None]:
import torch

USE_CUDA = torch.coda.is_avaliable() # CUDA를 사용할 수 있는가?
DEVICE = torch.device('cuda' if USE_CUDA else 'cpu') # CUDA를 사용할 수 있으면 CUDA를 사용하고 아니면 cpu 사용

BATCH_SIZE = 256
EPOCH = 30

import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder

transform_base = transforms.Compose([transforms.Resize((64, 64)), transforms.ToTensor()])

train_dataset = ImageFolder(root = '/Users/minguinho/Documents/AI_Datasets/crops_leaf_dataset/train', transform = transform_base)
val_dataset   = ImageFolder(root = '/Users/minguinho/Documents/AI_Datasets/crops_leaf_dataset/val', transform = transform_base)

from torch.utils.data import DataLoader

# 데이터셋을 미니배치 단위로 분리. shuffle은 미니배치 단위로 나눈걸 섞는지 물어보는거고 num_workers는 데이터를 불러올 때 사용할 subprocesses의 개수. 데이터를 불러와서 연산을 하는데 이 때 데이터 로딩에 얼마나 많은 GPU 자원을 사용할거냐 물어보는거다. 잘 정해야하는 hyper parameter다.
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle = True, num_workers = 4)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size = BATCH_SIZE, shuffle=True, num_workers = 4)

# 모델 설계

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()

        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.conv2d(64, 64, 3, padding=1)

        self.fc1 = nn.Linear(4096, 512)
        self.fc2 = nn.Linear(512, 33)

    def forward(self, x) :

        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p=0.25, trainig=self.training) # self.training : 학습 모드일 때만 dropout을 사용

        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p=0.25, training=self.training)

        x = self.conv3(x)
        x = F.relu(x)
        x = self.pool(x)
        x = F.dropout(x, p=0.25, training=self.training)

        x = x.view(-1, 4096) # Flatten
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.fc2(x)

        return F.log_softmax(x, dim=1)

model_base = Net().to(DEVICE) # DEVICE에서 돌릴 model_base 생성
optimizer = optim.Adam(model_base.parameters(), lr = 0.001) # 학습률 0.001로 model_base의 parameter들을 업데이트하는 Adam 생성

# 학습 함수, 평가 함수

In [None]:
def train(model, train_loader, optimizer) : # 한 에포크만큼 훈련 실행
    model.train() # 학습 모드로 변경
    for batch_idx, (data, target) in enumerate(train_loader) : # 배치 단위로 parameter 업데이트
        data, target = data.to(DEVICE), target.to(DEVICE) # to() : DEVICE에서 데이터 처리
        optimizer.zero_grad() # torch 모듈에 있는 그레디언트 초기화
        output = model(data)
        loss = F.cross_entropy(output, target)
        loss.backward() # loss에서 각 parameter에 해당하는 그레디언트 구함
        optimizer.step() # 가중치 업데이트

In [None]:
def evaluate(model, test_loader) : 
    model.eval() # 평가 모드(검증, 테스트 용도)로 변경
    test_loss = 0
    correct = 0

    with torch.no_grad(): # 평가를 하며 모델의 parameter를 업데이트 하는걸 중단. with문에 속한 영역에서만 torch.no_grad()가 적용됨
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            test_loss +=F.cross_entropy(output, target, reduction = 'sum').item() # 계산값 양식이 Tensor다. item()은 Tensor에 담긴 값을 사용하겠다는 것. 예 : tensor([[ 1]]).item = 1
            # reduction : Specifies the reduction to apply to the output. 'sum': the output will be summed. loss를 다 더한값을 반환 클래스별 loss가 있기 때문에 이걸 평균값(mean)을 낼지 다른걸 할지 결정. 기본값은 평균

            pred = output.max(1, keepdim = True)[1] # 확률 제일 높은걸 사용. Returns the maximum value of all elements in Tensor이고 keepdim = True이면 output과 같은 양식으로 반환됨. 1은 dim=1인데 이는 the dimension to reduce = 1이라는 뜻과 같다.
            # 그럼 [1]은 뭐지? 

            correct += preq.eq(target.view_as(pred)).sum().item() # target.view_as(pred) : pred와 같은 모양으로 정렬, eq는 두 텐서가 같은지 판단하는 비교 연산자로 같은면 1, 다르면 0 반환. 클래스별 확률값을 다 대조하는듯 한데 어짜피 한가지 객체에 대한 분류 모델이라 0 아니면 1이 된다. 

    test_loss /= len(test_loader.dataset) # 미니배치 단위로 구한걸 다 합쳤으니 이걸 미니배치 개수로 나눠 평균 loss를 구해줌. 
    test_accuracy = 100. * correct / len(test_loader.dataset) # 0~100% 중 하나가 나오겠지? 

    return test_loss, test_accuracy

# 학습 & 성능평가

In [None]:
import copy
import time 

def train_baseline(model, train_loader, val_loader, optimizer, num_epochs = 30) :
    best_acc = 0.0
    best_model_wts = copy.deepcopy(model.state_dict()) # 가장 성능이 좋은 모델을 저장하기 위함

    for epoch in range(1, num_epochs + 1):
        since = time.time()
        train(model, train_loader, optimizer)
        # loss 휙득
        train_loss, train_acc = evaluate(model, train_loader)
        val_loss, val_acc = evaluate(model, val_loader)

        if val_acc > best_acc : # 학습 과정에서 가장 적은 val_loss가 나온 모델을 최종 모델로 저장
            best_acc = val_acc
            best_model_wts = copy.deepcopy(model.state_dict()) # 저장

        time_elapsed = time.time() - since # 학습에 소요된 시간
        print('---------------- epoch {} ----------------'.format(epoch))

        print('train Loss : {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))
        print('val Loss : {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    model.load_state_dict(best_model_wts)
    return model 

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)

torch.save(base, 'baseline.pt')