## 작물 잎 사진으로 질병 분류

### 프로젝트
- 작물 잎 사진으로 질병을 가지고 있는지 여부를 판단
- 원본데이터 : https://data.mendeley.com/datasets/tywbtsjrjv/1
- 데이터 다운로드 : https://drive.google.com/drive/folders/1QswvBejKJrc9tz7nNsxGxJblR2vwo6na

#### 데이터 분할을 위한 디렉토리 생성

In [3]:
import os
import shutil
 
original_dataset_dir = './dataset'              # 원본 데이터세트 경로 지정
classes_list = os.listdir(original_dataset_dir) # 모든 하위폴더 목록을 가져오기
 
base_dir = './splitted'                         # 나눈 데이터를 저장할 폴더 생성
os.mkdir(base_dir)
 
train_dir = os.path.join(base_dir, 'train')     # train, val, test 폴더 생성
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'val')
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
os.mkdir(test_dir)

for cls in classes_list:                        # 각 하위에 각각의 클래스 목록 폴더 생성
    os.mkdir(os.path.join(train_dir, cls))
    os.mkdir(os.path.join(validation_dir, cls))
    os.mkdir(os.path.join(test_dir, cls))

FileExistsError: [WinError 183] 파일이 이미 있으므로 만들 수 없습니다: './splitted'

#### 데이터 분할과 클래스별 데이터 수 

In [4]:
import math
 
for cls in classes_list:                                # 반복해서 모든 클래스에 대한 작업을 반복
    path = os.path.join(original_dataset_dir, cls)
    fnames = os.listdir(path)                           # path 에 존재하는 모든 이미지 파일의 목록을 변수 fnames에 저장
 
    train_size = math.floor(len(fnames) * 0.6)          # 각 작업별 데이터 비율 저장 6:2:2
    validation_size = math.floor(len(fnames) * 0.2)
    test_size = math.floor(len(fnames) * 0.2)
    
    train_fnames = fnames[:train_size]                      # Train 데이터에 해당하는 파일의 이름을 train_fnames에 저장
    print("Train size(",cls,"): ", len(train_fnames))
    for fname in train_fnames:
        src = os.path.join(path, fname)                     # 복사할 원본 파일의 경로 지정
        dst = os.path.join(os.path.join(train_dir, cls), fname) # 복사한 후 저장할 파일의 경로 지정
        shutil.copyfile(src, dst)                           # src 경로에 해당하는 파일을 dst 경로에 저장
        
    validation_fnames = fnames[train_size:(validation_size + train_size)]
    print("Validation size(",cls,"): ", len(validation_fnames))
    for fname in validation_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(validation_dir, cls), fname)
        shutil.copyfile(src, dst)
        
    test_fnames = fnames[(train_size+validation_size):(validation_size + train_size +test_size)]

    print("Test size(",cls,"): ", len(test_fnames))
    for fname in test_fnames:
        src = os.path.join(path, fname)
        dst = os.path.join(os.path.join(test_dir, cls), fname)
        shutil.copyfile(src, dst)

Train size( Apple___Apple_scab ):  378


NameError: name 'train_dir' is not defined

#### 베이스라인 모델 학습 준비

In [5]:
import torch
import os
 
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device('cuda' if USE_CUDA else 'cpu')
BATCH_SIZE = 256 
EPOCH = 30 

DEVICE

device(type='cuda')

In [None]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder 

# transform.compose() 이미지전처리
## augmentation - 증가율 좌우반전, 밝기 조절, 이미지 확대등
## 이미지 크기를 64*64로 조정
## Tensor 형태로 변환, 모든 값을 0~1사이로 정규화
transform_base = transforms.Compose([transforms.Resize((64,64)), transforms.ToTensor()])   
train_dataset = ImageFolder(root='./splitted/train', transform=transform_base) 
val_dataset = ImageFolder(root='./splitted/val', transform=transform_base)

In [7]:
from torch.utils.data import DataLoader

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

#### 베이스라인 모델 설계

In [8]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
 
class Net(nn.Module): 
  
    def __init__(self):    
        super(Net, self).__init__() 

        self.conv1 = nn.Conv2d(3, 32, 3, padding=1) 
        self.pool = nn.MaxPool2d(2,2)  
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)  
        self.conv3 = nn.Conv2d(64, 64, 3, padding=1)  

        self.fc1 = nn.Linear(4096, 512) 
        self.fc2 = nn.Linear(512, 33) 
    
    def forward(self, x):    
        x = self.conv1(x)
        x = F.relu(x)  
        x = self.pool(x) 
        x = F.dropout(x, p=0.25, training=self.training) 

        x = self.conv2(x)
        x = F.relu(x) 
        x = self.pool(x) 
        x = F.dropout(x, p=0.25, training=self.training)

        x = self.conv3(x) 
        x = F.relu(x) 
        x = self.pool(x) 
        x = F.dropout(x, p=0.25, training=self.training)

        x = x.view(-1, 4096)  
        x = self.fc1(x) 
        x = F.relu(x) 
        x = F.dropout(x, p=0.5, training=self.training)
        x = self.fc2(x) 

        return F.log_softmax(x, dim=1)  

model_base = Net().to(DEVICE)  
optimizer = optim.Adam(model_base.parameters(), lr=0.001)

#### 모델 훈련을 위한 함수

In [10]:
def train(model, train_loader, optimizer):
    model.train()  
    for batch_idx, (data, target) in enumerate(train_loader):   # (data, target) 형태가 미니배치 단위로
        data, target = data.to(DEVICE), target.to(DEVICE)       # data, target 변수를 사용중인 디바이스에 할당
        optimizer.zero_grad()                                   # 이전 Batch의 Gradient값을 optimizer로 초기화
        output = model(data)                                    # 데이터를 모델에 입력, output값을 계산
        loss = F.cross_entropy(output, target)                  # 분류에 적합한 cross_entropy 손실함수 사용
        loss.backward()                                         # Back Propagation(역전파)로 계산한 Gradient값을 각 파라미터에 할당
        optimizer.step()                                        # 모델의 파라미터를 업데이트

#### 모델 평가를 위한 함수

In [11]:
def evaluate(model, test_loader):
    model.eval()  
    test_loss = 0 
    correct = 0   
    
    with torch.no_grad(): 
        for data, target in test_loader:  
            data, target = data.to(DEVICE), target.to(DEVICE)  
            output = model(data) 
            
            test_loss += F.cross_entropy(output,target, reduction='sum').item() 
 
            
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item() 
   
    test_loss /= len(test_loader.dataset) 
    test_accuracy = 100. * correct / len(test_loader.dataset) 
    return test_loss, test_accuracy  

#### 모델 훈련 실행

In [12]:
import time
import copy
 
def train_baseline(model ,train_loader, val_loader, optimizer, num_epochs = 30):
    best_acc = 0.0  
    best_model_wts = copy.deepcopy(model.state_dict()) 
 
    for epoch in range(1, num_epochs + 1):
        since = time.time()  
        train(model, train_loader, optimizer)
        train_loss, train_acc = evaluate(model, train_loader) 
        val_loss, val_acc = evaluate(model, val_loader)
        
        if val_acc > best_acc: 
            best_acc = val_acc 
            best_model_wts = copy.deepcopy(model.state_dict())
        
        time_elapsed = time.time() - since 
        print('-------------- epoch {} ----------------'.format(epoch))
        print('train Loss: {:.4f}, Accuracy: {:.2f}%'.format(train_loss, train_acc))   
        print('val Loss: {:.4f}, Accuracy: {:.2f}%'.format(val_loss, val_acc))
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60)) 
    model.load_state_dict(best_model_wts)  
    return model
 

base = train_baseline(model_base, train_loader, val_loader, optimizer, EPOCH)  	 #(16)
torch.save(base,'baseline.pt')

-------------- epoch 1 ----------------
train Loss: 1.5871, Accuracy: 53.12%
val Loss: 1.6186, Accuracy: 51.96%
Completed in 2m 7s
-------------- epoch 2 ----------------
train Loss: 1.0419, Accuracy: 67.90%
val Loss: 1.0798, Accuracy: 66.84%
Completed in 2m 8s
-------------- epoch 3 ----------------
train Loss: 0.7576, Accuracy: 76.94%
val Loss: 0.8013, Accuracy: 75.33%
Completed in 2m 8s
-------------- epoch 4 ----------------
train Loss: 0.5775, Accuracy: 82.38%
val Loss: 0.6362, Accuracy: 80.19%
Completed in 2m 8s
-------------- epoch 5 ----------------
train Loss: 0.5258, Accuracy: 83.82%
val Loss: 0.5875, Accuracy: 81.88%
Completed in 2m 8s
-------------- epoch 6 ----------------
train Loss: 0.4466, Accuracy: 85.89%
val Loss: 0.5183, Accuracy: 83.61%
Completed in 2m 7s
-------------- epoch 7 ----------------
train Loss: 0.4422, Accuracy: 85.94%
val Loss: 0.5186, Accuracy: 83.51%
Completed in 2m 8s
-------------- epoch 8 ----------------
train Loss: 0.3641, Accuracy: 88.40%
val Lo

#### Transfer Learning 모델 학습 준비
- 높은 성능의 이미지 분류모델을 구축위해 많은 수, 질 좋은 데이터세트가 필요
- 미리 학습된 Pre-Trained Model, 이를 조정하는 과정을 Fine-Tuning이라고 함
- 이 기법을 모두 통틀어 Tranfer Learning이라 부름
- AlexNet, VGG, RegNet, SqueezeNet, DenseNet, Inception v3, GoogleNet, ResNeXt, ResNet 등...
- torchvision.models 패키지 밑에서 제공

In [13]:
data_transforms = {
    'train': transforms.Compose([transforms.Resize([64,64]), 
        transforms.RandomHorizontalFlip(), transforms.RandomVerticalFlip(),  
        transforms.RandomCrop(52), transforms.ToTensor(), 
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]),
    
    'val': transforms.Compose([transforms.Resize([64,64]),  
        transforms.RandomCrop(52), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ])
}

In [14]:
data_dir = './splitted' 
image_datasets = {x: ImageFolder(root=os.path.join(data_dir, x), transform=data_transforms[x]) for x in ['train', 'val']} 
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=BATCH_SIZE, shuffle=True, num_workers=4) for x in ['train', 'val']} 
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

class_names = image_datasets['train'].classes

#### Pre-Trained Model 불러오기

In [15]:
from torchvision import models
 
resnet = models.resnet50(pretrained=True)  
num_ftrs = resnet.fc.in_features   
resnet.fc = nn.Linear(num_ftrs, 33) 
resnet = resnet.to(DEVICE)
 
criterion = nn.CrossEntropyLoss() 
optimizer_ft = optim.Adam(filter(lambda p: p.requires_grad, resnet.parameters()), lr=0.001)
 
from torch.optim import lr_scheduler
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) 

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to C:\Users\perso/.cache\torch\hub\checkpoints\resnet50-0676ba61.pth
100.0%


#### Pre-Trained Model의 일부 Layer Freeze

In [16]:
ct = 0 
for child in resnet.children():  
    ct += 1  
    if ct < 6: 
        for param in child.parameters():
            param.requires_grad = False

#### Transfer Learning 모델 학습과 검증을 위한 함수

In [17]:
def train_resnet(model, criterion, optimizer, scheduler, num_epochs=25):

    best_model_wts = copy.deepcopy(model.state_dict())  
    best_acc = 0.0  
    
    for epoch in range(num_epochs):
        print('-------------- epoch {} ----------------'.format(epoch+1)) 
        since = time.time()                                     
        for phase in ['train', 'val']: 
            if phase == 'train': 
                model.train() 
            else:
                model.eval()     
 
            running_loss = 0.0  
            running_corrects = 0  
 
            
            for inputs, labels in dataloaders[phase]: 
                inputs = inputs.to(DEVICE)  
                labels = labels.to(DEVICE)  
                
                optimizer.zero_grad() 
                
                with torch.set_grad_enabled(phase == 'train'):  
                    outputs = model(inputs)  
                    _, preds = torch.max(outputs, 1) 
                    loss = criterion(outputs, labels)  
    
                    if phase == 'train':   
                        loss.backward()
                        optimizer.step()
 
                running_loss += loss.item() * inputs.size(0)  
                running_corrects += torch.sum(preds == labels.data)  
            if phase == 'train':  
                scheduler.step()
 
            epoch_loss = running_loss/dataset_sizes[phase]  
            epoch_acc = running_corrects.double()/dataset_sizes[phase]  
 
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc)) 
 
          
            if phase == 'val' and epoch_acc > best_acc: 
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
 
        time_elapsed = time.time() - since  
        print('Completed in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
 
    model.load_state_dict(best_model_wts) 

    return model

#### 모델 학습을 실행

In [18]:
model_resnet50 = train_resnet(resnet, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=EPOCH) 

torch.save(model_resnet50, 'resnet50.pt')

-------------- epoch 1 ----------------
train Loss: 0.5863 Acc: 0.8223
val Loss: 0.3316 Acc: 0.8992
Completed in 2m 4s
-------------- epoch 2 ----------------
train Loss: 0.2183 Acc: 0.9289
val Loss: 0.2660 Acc: 0.9158
Completed in 1m 57s
-------------- epoch 3 ----------------
train Loss: 0.1608 Acc: 0.9483
val Loss: 0.1233 Acc: 0.9598
Completed in 1m 58s
-------------- epoch 4 ----------------
train Loss: 0.1393 Acc: 0.9551
val Loss: 0.1677 Acc: 0.9454
Completed in 1m 58s
-------------- epoch 5 ----------------
train Loss: 0.1203 Acc: 0.9616
val Loss: 0.1155 Acc: 0.9611
Completed in 1m 57s
-------------- epoch 6 ----------------
train Loss: 0.0988 Acc: 0.9685
val Loss: 0.1036 Acc: 0.9655
Completed in 2m 1s
-------------- epoch 7 ----------------
train Loss: 0.0723 Acc: 0.9765
val Loss: 0.0791 Acc: 0.9742
Completed in 2m 1s
-------------- epoch 8 ----------------
train Loss: 0.0405 Acc: 0.9876
val Loss: 0.0358 Acc: 0.9877
Completed in 1m 60s
-------------- epoch 9 ----------------
tra

#### 모델 평가

In [19]:
transform_base = transforms.Compose([transforms.Resize([64,64]),transforms.ToTensor()])
test_base = ImageFolder(root='./splitted/test',transform=transform_base)  
test_loader_base = torch.utils.data.DataLoader(test_base, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

#### Transfer Learning모델 평가를 위한 전처리

In [20]:
transform_resNet = transforms.Compose([
        transforms.Resize([64,64]),  
        transforms.RandomCrop(52),  
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) 
    ])
    
test_resNet = ImageFolder(root='./splitted/test', transform=transform_resNet) 
test_loader_resNet = torch.utils.data.DataLoader(test_resNet, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

#### 베이스라인 모델 성능 평가

In [22]:
baseline=torch.load('./baseline.pt') 
baseline.eval()  
test_loss, test_accuracy = evaluate(baseline, test_loader_base)

print('baseline test acc:  ', test_accuracy)

UnpicklingError: Weights only load failed. This file can still be loaded, to do so you have two options, [1mdo those steps only if you trust the source of the checkpoint[0m. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL __main__.Net was not an allowed global by default. Please use `torch.serialization.add_safe_globals([Net])` or the `torch.serialization.safe_globals([Net])` context manager to allowlist this global if you trust this class/function.

Check the documentation of torch.load to learn more about types accepted by default with weights_only https://pytorch.org/docs/stable/generated/torch.load.html.

#### Transfer Learning 모델 성능 평가

In [23]:
resnet50=torch.load('resnet50.pt') 
resnet50.eval()  
test_loss, test_accuracy = evaluate(resnet50, test_loader_resNet)

print('ResNet test acc:  ', test_accuracy)

UnpicklingError: Weights only load failed. This file can still be loaded, to do so you have two options, [1mdo those steps only if you trust the source of the checkpoint[0m. 
	(1) In PyTorch 2.6, we changed the default value of the `weights_only` argument in `torch.load` from `False` to `True`. Re-running `torch.load` with `weights_only` set to `False` will likely succeed, but it can result in arbitrary code execution. Do it only if you got the file from a trusted source.
	(2) Alternatively, to load with `weights_only=True` please check the recommended steps in the following error message.
	WeightsUnpickler error: Unsupported global: GLOBAL torchvision.models.resnet.ResNet was not an allowed global by default. Please use `torch.serialization.add_safe_globals([ResNet])` or the `torch.serialization.safe_globals([ResNet])` context manager to allowlist this global if you trust this class/function.

Check the documentation of torch.load to learn more about types accepted by default with weights_only https://pytorch.org/docs/stable/generated/torch.load.html.