In [None]:
# ==============================================================================
# 1. 라이브러리 설치, 임포트 및 장치 설정 (matplotlib 제외)
# ==============================================================================

!pip install efficientnet_pytorch
!pip install kagglehub

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms
from efficientnet_pytorch import EfficientNet
import os
import time
import copy
import numpy as np
from PIL import Image
from google.colab import drive

# GPU 설정
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"사용 장치: {device}")

# 하이퍼파라미터
BATCH_SIZE = 32
NUM_EPOCHS = 10
LEARNING_RATE = 0.001
MODEL_NAME = 'efficientnet-b0'
INPUT_SIZE = EfficientNet.get_image_size(MODEL_NAME)

In [None]:
# ==============================================================================
# 2. 사용자 정의 데이터셋 및 레이블 설정 (회귀)
# ==============================================================================

# ⚠️ 모든 작물 클래스에 대한 통합 HEALTH_SCORES 정의
# 데이터 따라서 라벨링 수정하기
HEALTH_SCORES = {
    # 벼 (Rice)
    'healthy': 1.00,        # 완벽한 건강
    'brown_spot': 0.50,
    'leaf_blast': 0.40,
    'neck_blast': 0.10,     # 심각한 질병

    # 딸기 (Strawberry) - 예시 (실제 폴더 이름으로 수정 필요)
    'Strawberry_Healthy': 0.98,
    'Strawberry_Leaf_Scotch': 0.35,

    # 감자 (Potato) - 예시 (실제 폴더 이름으로 수정 필요)
    'Potato_Healthy': 0.95,
    'Potato_Early_Blight': 0.45,
    'Potato_Late_Blight': 0.15,

}

class CustomRegressionDataset(Dataset):
    """ImageFolder를 기반으로 폴더 이름을 HEALTH_SCORES 점수로 변환하는 클래스"""
    def __init__(self, root_dir, transform=None):
        self.data = datasets.ImageFolder(root_dir, transform=transform)

        self.health_labels = []
        for path, class_index in self.data.samples:
            class_name = self.data.classes[class_index]
            score = HEALTH_SCORES.get(class_name, 0.5)
            self.health_labels.append(score)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img, _ = self.data[idx]
        score = self.health_labels[idx]
        # 회귀 문제이므로 레이블은 float32 타입이어야 합니다.
        return img, torch.tensor(score, dtype=torch.float32)

In [None]:
# ==============================================================================
# 3. 모델 구조 및 데이터 로드/통합
# ==============================================================================

# EfficientNet을 회귀용으로 재정의
class EfficientNetRegression(nn.Module):
    def __init__(self, model_name):
        super(EfficientNetRegression, self).__init__()
        self.base_model = EfficientNet.from_pretrained(model_name)
        in_features = self.base_model._fc.in_features
        self.base_model._fc = nn.Identity()

        self.regressor = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(in_features, 1), # 출력 차원: 1
            nn.Sigmoid() # 0과 1 사이의 값으로 강제 출력
        )

    def forward(self, x):
        features = self.base_model(x)
        output = self.regressor(features)
        return output.squeeze(1)

# 데이터 변환 및 로드/통합
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(INPUT_SIZE), transforms.RandomResizedCrop(INPUT_SIZE),
        transforms.RandomHorizontalFlip(), transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
    'test': transforms.Compose([
        transforms.Resize(INPUT_SIZE), transforms.CenterCrop(INPUT_SIZE),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

train_datasets = []
test_datasets = []

# 학습 데이터 통합 (CustomRegressionDataset 사용)
for dir_path in ALL_TRAIN_DIRS:
    if os.path.exists(dir_path):
        current_dataset = CustomRegressionDataset(dir_path, data_transforms['train'])
        train_datasets.append(current_dataset)
        print(f"✅ {os.path.basename(dir_path)} 로드 완료. 이미지 수: {len(current_dataset)}")
    else:
        print(f"❌ 경로를 찾을 수 없음 (Train): {dir_path}. 이 데이터셋은 제외됩니다.")

# 검증 데이터 통합
for dir_path in ALL_TEST_DIRS:
    if os.path.exists(dir_path):
        current_dataset = CustomRegressionDataset(dir_path, data_transforms['test'])
        test_datasets.append(current_dataset)

# 모든 작물 데이터셋을 하나의 큰 데이터셋으로 합치기
combined_train_dataset = ConcatDataset(train_datasets)
combined_test_dataset = ConcatDataset(test_datasets)

dataloaders = {
    'train': DataLoader(combined_train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4),
    'test': DataLoader(combined_test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
}

dataset_sizes = {'train': len(combined_train_dataset), 'test': len(combined_test_dataset)}

print(f"\n✅ 통합 학습 데이터셋 총 크기: {dataset_sizes['train']}")
print(f"✅ 통합 검증 데이터셋 총 크기: {dataset_sizes['test']}")

# 모델 초기화
model_ft = EfficientNetRegression(MODEL_NAME).to(device)

# ⚠️ 손실 함수를 회귀용 MSELoss (Mean Squared Error)로 변경
criterion = nn.MSELoss()

optimizer_ft = optim.Adam(model_ft.parameters(), lr=LEARNING_RATE)
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=5, gamma=0.1)

In [None]:
# ==============================================================================
# 4. 학습 및 저장
# ==============================================================================

def train_model(model, criterion, optimizer, scheduler, num_epochs=15):
    since = time.time()
    best_loss = float('inf')
    best_model_wts = copy.deepcopy(model.state_dict())

    for epoch in range(num_epochs):
        print(f'\nEpoch {epoch+1}/{num_epochs}'); print('-' * 10)

        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()
                scheduler.step()
            else:
                model.eval()

            running_loss = 0.0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)

            epoch_loss = running_loss / dataset_sizes[phase]
            print(f'{phase} Loss (MSE): {epoch_loss:.6f}')

            # 최저 Loss 모델 저장
            if phase == 'test' and epoch_loss < best_loss:
                best_loss = epoch_loss
                best_model_wts = copy.deepcopy(model.state_dict())
                print(f'-> New best model found! Saving weights with Loss: {best_loss:.6f}')

    time_elapsed = time.time() - since
    print(f'\n학습 완료! 총 시간: {time_elapsed // 60:.0f}분 {time_elapsed % 60:.0f}초')
    print(f'최고 검증 Loss: {best_loss:.6f}')

    model.load_state_dict(best_model_wts)
    return model


In [None]:
# ------------------------------------------------------------------------------
# 모델 학습 실행 및 결과 저장
# ------------------------------------------------------------------------------

# 모델 학습 시작
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=NUM_EPOCHS)

In [None]:
# 구글 드라이브 마운트 및 모델 저장
drive.mount('/content/drive')
MODEL_SAVE_PATH = '/content/drive/MyDrive/multi_crop_health_regression_model.pth'

torch.save(model_ft.state_dict(), MODEL_SAVE_PATH)
print(f"\n✅ 모델 가중치가 {MODEL_SAVE_PATH}에 저장되었습니다.")