In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

In [None]:
# 구글 코랩에서 데이터셋 다운로드 및 준비
from google.colab import files
files.upload()  # 'kaggle.json' 파일 업로드

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# 아몬드 데이터셋 다운로드
!kaggle datasets download -d mahyeks/almond-varieties

# almond 폴더에 데이터 압축 해제
!unzip almond-varieties.zip -d almond

In [None]:
# EDA

import os
import cv2
import random
import numpy as np
import matplotlib.pyplot as plt

from collections import Counter

# 데이터 폴더 경로
root_dir = 'almond/dataset'

# 라벨 목록
labels = os.listdir(root_dir)

# 각 라벨별 데이터 개수 확인
data_counts = {}
example_images = {}
image_sizes = []
aspect_ratios = []
color_distributions = []
brightness_values = []
file_types = Counter()

for label in labels:
    data_dir = os.path.join(root_dir, label)
    datas = os.listdir(data_dir)
    data_counts[label] = len(datas)

    # 예시 이미지 저장
    if datas:
        example_images[label] = os.path.join(data_dir, random.choice(datas))

    # 이미지 크기 및 색상 분석
    for img_name in datas:
        img_path = os.path.join(data_dir, img_name)
        file_extension = os.path.splitext(img_name)[1].lower()
        file_types[file_extension] += 1

        img = cv2.imread(img_path)
        if img is not None:
            h, w, _ = img.shape
            image_sizes.append((w, h))
            aspect_ratios.append(w / h)

# 데이터 분포 시각화
plt.figure(figsize=(8, 5))
plt.bar(data_counts.keys(), data_counts.values(), color='skyblue')
plt.xlabel('Labels')
plt.ylabel('Number of Images')
plt.title('Dataset Distribution')
plt.show()

# 예시 이미지 시각화
fig, axes = plt.subplots(1, len(example_images), figsize=(15, 5))

for ax, (label, img_path) in zip(axes, example_images.items()):
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    ax.imshow(img)
    ax.set_title(label)
    ax.axis('off')

plt.show()

# 데이터 개수 출력
print("Dataset Image Count:")
for label, count in data_counts.items():
    print(f"{label}: {count} images")

# 최소, 최대 이미지 크기 출력
if image_sizes:
    min_size = min(image_sizes, key=lambda x: x[0] * x[1])
    max_size = max(image_sizes, key=lambda x: x[0] * x[1])
    print(f"Smallest Image Size: {min_size[1]}x{min_size[0]}")
    print(f"Largest Image Size: {max_size[1]}x{max_size[0]}")

# 파일 형식 개수 출력
print("File Type Distribution:")
for ext, count in file_types.items():
    print(f"{ext}: {count} files")


**초기 실험**

In [None]:
# data 비율 별 dataloader

import torch

from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split

def set_dataloader(split_ratio):
    # 라벨 목록
    labels = os.listdir(root_dir)
    label_to_index = {label: idx for idx, label in enumerate(labels)}
    num_classes = len(labels)

    # 전체 데이터 리스트 생성 (폴더별로 일정 비율 유지)
    data_list = {label: [] for label in labels}
    for label in labels:
        data_dir = os.path.join(root_dir, label)
        datas = os.listdir(data_dir)
        for img_name in datas:
            img_path = os.path.join(data_dir, img_name)
            data_list[label].append((img_path, label_to_index[label]))

    # 데이터셋 분할 (폴더별로 유지)
    train_data, val_data, test_data = [], [], []
    for label, items in data_list.items():
        random.shuffle(items)
        total_size = len(items)
        train_end = int(split_ratio * total_size)
        val_end = train_end + int((split_ratio/2) * total_size)

        train_data.extend(items[:train_end])
        val_data.extend(items[train_end:val_end])
        test_data.extend(items[val_end:])

    # 데이터셋 클래스 정의
    class CustomImageDataset(Dataset):
        def __init__(self, data, transform=None):
            self.data = data
            self.transform = transform

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            img_path, label = self.data[idx]
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            if self.transform:
                img = self.transform(img)

            return img, label

    # 데이터 변환 정의
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),  # 이미지 크기 조정
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 데이터셋 생성
    train_dataset = CustomImageDataset(train_data, transform=transform)
    val_dataset = CustomImageDataset(val_data, transform=transform)
    test_dataset = CustomImageDataset(test_data, transform=transform)

    batch_size = 16

    # 데이터로더 생성
    dataloader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    dataloader_val = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    dataloader_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

    # 데이터셋 크기 출력
    print(f"Training Dataset Size: {len(train_dataset)}")
    print(f"Validation Dataset Size: {len(val_dataset)}")
    print(f"Test Dataset Size: {len(test_dataset)}")

    # 데이터 확인
    for images, labels in dataloader_train:
        print(f"Train Batch Image Shape: {images.shape}")
        print(f"Train Batch Labels: {labels}")
        break

    return dataloader_train, dataloader_val, dataloader_test, num_classes

In [None]:
# 모델 정의 함수

import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

def get_model(model_name, num_classes):
    if model_name == 'resnet50':
        model = models.resnet50(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    elif model_name == 'efficientnet_b0':
        model = models.efficientnet_b0(pretrained=True)
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)
    elif model_name == 'mobilenet_v3_small':
        model = models.mobilenet_v3_small(pretrained=True)
        model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
    elif model_name == 'shufflenet_v2_x1_0':
        model = models.shufflenet_v2_x1_0(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    else:
        raise ValueError("Unsupported model")
    return model

In [None]:
# 학습 함수
def train_model(model_name, train_loader, val_loader, num_classes, save_dir, device, split_ratio, epochs=10, lr=0.001, patience=3):
    model = get_model(model_name, num_classes=num_classes)
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    best_val_loss = float('inf')
    no_improve_count = 0
    log_file = open(f"{save_dir}/training_log_{model_name}.txt", "w")
    log_file.write(f"Model: {model_name}, Epochs: {epochs}, Learning Rate: {lr}, Patience: {patience}, Train Ratio: {split_ratio*100}%, Validation Ratio: {split_ratio*50}%\n")

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_acc = correct / total
        val_loss, val_acc = validate_model(model, val_loader, device, criterion)
        log_file.write(f"Epoch {epoch+1}/{epochs}, Train Loss: {running_loss/len(train_loader):.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}\n")
        print(f"Epoch {epoch+1}/{epochs}, Train Loss: {running_loss/len(train_loader):.4f}, Val Loss: {val_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            no_improve_count = 0
            torch.save(model.state_dict(), f"{save_dir}/best_model_{model_name}.pth")
        else:
            no_improve_count += 1

        if no_improve_count >= patience:
            print(f"Early stopping at epoch {epoch+1} due to no improvement in validation accuracy.")
            log_file.write(f"Early stopping at epoch {epoch+1}\n")
            break

    log_file.close()
    print(f"Best Validation Loss: {best_val_loss:.4f}")

# validation 평가 함수 (Loss와 Accuracy 반환)
def validate_model(model, dataloader, device, criterion):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    return avg_loss, accuracy

In [None]:
def create_next_directory(base_path):
    # base_path 경로 내의 기존 디렉토리 목록 확인
    existing_dirs = [d for d in os.listdir(base_path) if os.path.isdir(os.path.join(base_path, d))]

    # 숫자로 된 디렉토리만 필터링
    numbered_dirs = sorted([int(d) for d in existing_dirs if d.isdigit()])

    # 다음 디렉토리 번호 결정
    next_number = numbered_dirs[-1] + 1 if numbered_dirs else 1
    new_dir = os.path.join(base_path, str(next_number))

    # 디렉토리 생성
    os.makedirs(new_dir, exist_ok=True)
    print(f"Created directory: {new_dir}")

    return new_dir

In [None]:
# 평가 함수
import seaborn as sns

from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

# Fine-tuned 모델 불러오는 함수
def load_fine_tuned_model(model_name, num_classes, checkpoint_path, device):
    if model_name == 'resnet50':
        model = models.resnet50(pretrained=False)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    elif model_name == 'efficientnet_b0':
        model = models.efficientnet_b0(pretrained=False)
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)
    elif model_name == 'mobilenet_v3_small':
        model = models.mobilenet_v3_small(pretrained=True)
        model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
    elif model_name == 'shufflenet_v2_x1_0':
        model = models.shufflenet_v2_x1_0(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    else:
        raise ValueError("Unsupported model")

    model.load_state_dict(torch.load(checkpoint_path))
    model.to(device)
    return model


# Confusion Matrix 시각화 함수
def plot_confusion_matrix(y_true, y_pred, classes, model_name):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'{model_name} Confusion Matrix')
    plt.show()

# 모델 성능 평가 및 Confusion Matrix 출력
def evaluate_model(model_name, dir_num, dataloader, device):
    checkpoint_path = os.path.join(dir_num, f"best_model_{model_name}.pth")
    model = load_fine_tuned_model(model_name, num_classes, checkpoint_path, device)
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # 데이터 폴더 경로
    root_dir = 'almond/dataset'

    # 라벨 목록
    labels = os.listdir(root_dir)

    # 평가 지표 계산
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test Precision: {precision:.4f}")
    print(f"Test Recall: {recall:.4f}")
    print(f"Test F1 Score: {f1:.4f}")

    plot_confusion_matrix(all_labels, all_preds, labels, model_name)

# 평가 함수
import os
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from torch import nn
from torchvision import models
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

def process_single_experiment(dataloader, device, num_classes, experiment_dir):
    """
    특정 실험 폴더(예: /content/drive/MyDrive/KTB/personal_mission2/train_dir/1)에서만
    학습 로그 및 Confusion Matrix를 저장하는 함수
    """

    # train_dir에서 test_dir로 변환
    test_experiment_dir = experiment_dir.replace("train_dir", "test_dir")

    # experiment_dir 존재 여부 확인
    if not os.path.exists(experiment_dir):
        print(f"⚠️ 유효하지 않은 실험 디렉토리: {experiment_dir}")
        return

    # 해당 실험 폴더 안의 training_log 파일 찾기
    txt_files = [os.path.join(experiment_dir, file) for file in os.listdir(experiment_dir) if file.startswith("training_log_") and file.endswith(".txt")]

    if not txt_files:
        print(f"❌ {experiment_dir}에서 training_log 파일을 찾을 수 없습니다.")
        return

    os.makedirs(test_experiment_dir, exist_ok=True)  # test_dir에 해당 실험 폴더 생성

    for txt_file in txt_files:
        df = parse_log(txt_file)
        if df is not None:
            model_name = extract_model_name(os.path.basename(txt_file))

            # 학습 로그 그래프 저장
            plt.figure(figsize=(12, 5))

            # Loss 그래프
            plt.subplot(1, 2, 1)
            plt.plot(df["Epoch"], df["Train Loss"], label="Train Loss", marker="o")
            plt.plot(df["Epoch"], df["Val Loss"], label="Val Loss", marker="s")
            plt.xlabel("Epoch")
            plt.ylabel("Loss")
            plt.title(f"Training & Validation Loss ({model_name})")
            plt.legend()
            plt.grid(True)

            # Accuracy 그래프
            plt.subplot(1, 2, 2)
            plt.plot(df["Epoch"], df["Train Acc"], label="Train Acc", marker="o")
            plt.plot(df["Epoch"], df["Val Acc"], label="Val Acc", marker="s")
            plt.xlabel("Epoch")
            plt.ylabel("Accuracy")
            plt.title(f"Training & Validation Accuracy ({model_name})")
            plt.legend()
            plt.grid(True)

            plt.tight_layout()
            plt.savefig(os.path.join(test_experiment_dir, f"training_log_{model_name}.png"))
            plt.close()

            # Confusion Matrix 및 성능 지표 저장
            evaluate_model(model_name, experiment_dir, dataloader, device, num_classes)

    print(f"✅ {experiment_dir}의 학습 로그 및 Confusion Matrix가 {test_experiment_dir}에 저장되었습니다.")



In [None]:
for ratio in [0.8, 0.7, 0.6, 0.5, 0.4]:
    dataloader_train, dataloader_val, dataloader_test, num_classes = set_dataloader(ratio)

    save_dir = create_next_directory('/content/drive/MyDrive/KTB/personal_mission/train_dir')
    for model_name in ['resnet50', 'efficientnet_b0', 'mobilenet_v3_small', 'shufflenet_v2_x1_0']:
        print(f"Training {model_name}...")
        train_model(model_name, dataloader_train, dataloader_val, num_classes, save_dir, device, ratio, epochs=100, lr=0.001, patience=10)
    process_single_experiment(dataloader_test, device, num_classes, save_dir)

In [None]:
for ratio in [0.3, 0.2, 0.1, 0.08, 0.06, 0.04, 0.03, 0.02, 0.01]:
    dataloader_train, dataloader_val, dataloader_test, num_classes = set_dataloader(ratio)

    save_dir = create_next_directory('/content/drive/MyDrive/KTB/personal_mission/train_dir')
    for model_name in ['resnet50', 'efficientnet_b0', 'mobilenet_v3_small', 'shufflenet_v2_x1_0']:
        print(f"Training {model_name}...")
        train_model(model_name, dataloader_train, dataloader_val, num_classes, save_dir, device, ratio, epochs=100, lr=0.0001, patience=10)
    process_single_experiment(dataloader_test, device, num_classes, save_dir)

In [None]:
# 모델 성능 평가 및 Confusion Matrix 저장
def evaluate_baseline_model(model_name, dir_path, dataloader, device, num_classes):
    model = get_model(model_name, num_classes)
    model.to(device)
    if model != None:
        model.eval()
        all_preds, all_labels = [], []

        with torch.no_grad():
            for images, labels in dataloader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # 데이터셋의 클래스 리스트 가져오기
        root_dir = 'almond/dataset'
        classes = os.listdir(root_dir)

        # 평가 지표 계산
        accuracy = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted')
        recall = recall_score(all_labels, all_preds, average='weighted')
        f1 = f1_score(all_labels, all_preds, average='weighted')

        print(f"Test Accuracy: {accuracy:.4f}")
        print(f"Test Precision: {precision:.4f}")
        print(f"Test Recall: {recall:.4f}")
        print(f"Test F1 Score: {f1:.4f}")

        # 저장 경로 변환 (train_dir -> test_dir)
        save_path = dir_path.replace(train_root, test_root)
        os.makedirs(save_path, exist_ok=True)
        cm_save_path = os.path.join(save_path, f"confusion_matrix_{model_name}.png")

        plot_confusion_matrix(all_labels, all_preds, classes, model_name, cm_save_path)

        # 평가지표 저장
        metrics_save_path = os.path.join(save_path, f"metrics_{model_name}.txt")
        with open(metrics_save_path, "w") as f:
            f.write(f"Model: {model_name}\n")
            f.write(f"Test Accuracy: {accuracy:.4f}\n")
            f.write(f"Test Precision: {precision:.4f}\n")
            f.write(f"Test Recall: {recall:.4f}\n")
            f.write(f"Test F1 Score: {f1:.4f}\n")

        print(f"📄 {metrics_save_path}에 평가지표 저장 완료!")

In [None]:
for model_name, ratio in zip(['resnet50', 'efficientnet_b0', 'mobilenet_v3_small', 'shufflenet_v2_x1_0'], [0.01, 0.02, 0.08, 0.03]):
    dataloader_train, dataloader_val, dataloader_test, num_classes = set_dataloader(ratio)

    save_dir = '/content/drive/MyDrive/KTB/personal_mission/baseline'

    evaluate_baseline_model(model_name, save_dir, dataloader_test, device, num_classes)

**후기 실험**

In [None]:
# 소수의 dataset으로 구성된 dataloader

import torch

from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split

def set_dataloader():
    # 라벨 목록
    labels = os.listdir(root_dir)
    label_to_index = {label: idx for idx, label in enumerate(labels)}
    num_classes = len(labels)

    # 전체 데이터 리스트 생성 (폴더별로 일정 비율 유지)
    data_list = {label: [] for label in labels}
    for label in labels:
        data_dir = os.path.join(root_dir, label)
        datas = os.listdir(data_dir)
        for img_name in datas:
            img_path = os.path.join(data_dir, img_name)
            data_list[label].append((img_path, label_to_index[label]))

    # 데이터셋 분할 (폴더별로 유지)
    train_data, val_data, test_data = [], [], []
    for label, items in data_list.items():
        random.shuffle(items)
        total_size = len(items)

        train_data.extend(items[:4])
        val_data.extend(items[4:6])
        test_data.extend(items[6:])

    # 데이터셋 클래스 정의
    class CustomImageDataset(Dataset):
        def __init__(self, data, transform=None):
            self.data = data
            self.transform = transform

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            img_path, label = self.data[idx]
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            if self.transform:
                img = self.transform(img)

            return img, label

    # 데이터 변환 정의
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),  # 이미지 크기 조정
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 데이터셋 생성
    train_dataset = CustomImageDataset(train_data, transform=transform)
    val_dataset = CustomImageDataset(val_data, transform=transform)
    test_dataset = CustomImageDataset(test_data, transform=transform)

    batch_size = 4

    # 데이터로더 생성
    dataloader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    dataloader_val = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    dataloader_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

    # 데이터셋 크기 출력
    print(f"Training Dataset Size: {len(train_dataset)}")
    print(f"Validation Dataset Size: {len(val_dataset)}")
    print(f"Test Dataset Size: {len(test_dataset)}")

    # 데이터 확인
    for images, labels in dataloader_train:
        print(f"Train Batch Image Shape: {images.shape}")
        print(f"Train Batch Labels: {labels}")
        break

    return dataloader_train, dataloader_val, dataloader_test, num_classes

In [None]:
# augmentation dataloader

import torch

from PIL import Image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split

def set_aug_dataloader():
    # 라벨 목록
    labels = os.listdir(root_dir)
    label_to_index = {label: idx for idx, label in enumerate(labels)}
    num_classes = len(labels)

    # 전체 데이터 리스트 생성 (폴더별로 일정 비율 유지)
    data_list = {label: [] for label in labels}
    for label in labels:
        data_dir = os.path.join(root_dir, label)
        datas = os.listdir(data_dir)
        for img_name in datas:
            img_path = os.path.join(data_dir, img_name)
            data_list[label].append((img_path, label_to_index[label]))

    # 데이터셋 분할 (폴더별로 유지)
    train_data, val_data, test_data = [], [], []
    for label, items in data_list.items():
        random.shuffle(items)
        total_size = len(items)

        train_data.extend(items[:4])
        val_data.extend(items[4:6])
        test_data.extend(items[6:])

    # image_list, label_list 분리
    train_image_list = []
    train_label_list = []

    for img_path, label in train_data:
        image = Image.open(img_path).convert('RGB')  # 이미지 로딩 및 RGB 변환
        train_image_list.append(image)
        train_label_list.append(label)

    # Augmentation 포함 데이터셋 클래스
    class RotatedAugmentedDataset(Dataset):
        def __init__(self, image_list, label_list):
            self.image_list = image_list
            self.label_list = label_list

            # 공통 transform (크기 조정 + 정규화)
            self.base_transform = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                    std=[0.229, 0.224, 0.225])
            ])

            # 회전 각도
            self.angles = [0, 90, 180, 270]

        def __len__(self):
            # 데이터 4배로 반환
            return len(self.image_list) * 4

        def __getitem__(self, idx):
            img_idx = idx // 4
            angle_idx = idx % 4

            image = self.image_list[img_idx]
            label = self.label_list[img_idx]

            # Resize 먼저
            resized_image = image.resize((224, 224))

            # 회전 적용
            rotated_image = resized_image.rotate(self.angles[angle_idx])

            # 정규화 및 텐서 변환
            transformed_image = self.base_transform(rotated_image)

            return transformed_image, label

    # 데이터셋 클래스 정의
    class CustomImageDataset(Dataset):
        def __init__(self, data, transform=None):
            self.data = data
            self.transform = transform

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            img_path, label = self.data[idx]
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            if self.transform:
                img = self.transform(img)

            return img, label

    # 데이터 변환 정의
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),  # 이미지 크기 조정
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 데이터셋 생성
    train_dataset = RotatedAugmentedDataset(train_image_list, train_label_list)
    val_dataset = CustomImageDataset(val_data, transform=transform)
    test_dataset = CustomImageDataset(test_data, transform=transform)

    batch_size = 4

    # 데이터로더 생성
    dataloader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    dataloader_val = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    dataloader_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

    # 데이터셋 크기 출력
    print(f"Training Dataset Size: {len(train_dataset)}")
    print(f"Validation Dataset Size: {len(val_dataset)}")
    print(f"Test Dataset Size: {len(test_dataset)}")

    # 데이터 확인
    for images, labels in dataloader_train:
        print(f"Train Batch Image Shape: {images.shape}")
        print(f"Train Batch Labels: {labels}")
        break

    return dataloader_train, dataloader_val, dataloader_test, num_classes

In [None]:
# 모델 정의 함수 (Deep CNN 버전)

import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

def get_model(model_name, num_classes):
    if model_name == 'resnet50':
        model = models.resnet50(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    elif model_name == 'resnet152':
        model = models.resnet152(pretrained=True)
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    elif model_name == 'vgg16_bn':
        model = models.vgg16_bn(pretrained=True)
        model.classifier[6] = nn.Linear(model.classifier[6].in_features, num_classes)
    elif model_name == 'densenet201':
        model = models.densenet201(pretrained=True)
        model.classifier = nn.Linear(model.classifier.in_features, num_classes)
    elif model_name == 'seresnext101':
        model = models.resnext101_32x8d(pretrained=True)  # SE-ResNeXt는 torchvision에 없음 → 유사한 구조로 대체
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    elif model_name == 'convnext_base':
        model = models.convnext_base(pretrained=True)
        model.classifier[2] = nn.Linear(model.classifier[2].in_features, num_classes)
    else:
        raise ValueError(f"Unsupported model: {model_name}")

    return model


In [None]:
# 평가 함수(학습 함수는 초기 실험 함수 사용)

import os
import re
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from torch import nn
from torchvision import models
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

# Root 디렉토리 설정
train_root = "/content/drive/MyDrive/KTB/personal_mission/train_dir"
test_root = "/content/drive/MyDrive/KTB/personal_mission/test_dir"

# 모든 txt 파일을 찾는 함수
def find_txt_files(root_dir):
    txt_files = []
    for subdir, _, files in os.walk(root_dir):
        for file in files:
            if file.startswith("training_log_") and file.endswith(".txt"):
                txt_files.append(os.path.join(subdir, file))
    return txt_files

# txt 파일에서 학습 로그를 읽고 파싱하는 함수
def parse_log(file_path):
    epochs, train_loss, val_loss, train_acc, val_acc = [], [], [], [], []

    with open(file_path, "r") as f:
        for line in f:
            match = re.search(
                r"Epoch (\d+)/\d+, Train Loss: ([\d.]+), Val Loss: ([\d.]+), Train Acc: ([\d.]+), Val Acc: ([\d.]+)", line
            )
            if match:
                epoch, t_loss, v_loss, t_acc, v_acc = map(float, match.groups())
                epochs.append(int(epoch))
                train_loss.append(t_loss)
                val_loss.append(v_loss)
                train_acc.append(t_acc)
                val_acc.append(v_acc)

    if not epochs:  # 데이터가 없으면 None 반환
        return None

    return pd.DataFrame({
        "Epoch": epochs,
        "Train Loss": train_loss,
        "Val Loss": val_loss,
        "Train Acc": train_acc,
        "Val Acc": val_acc
    })

# 파일명에서 모델 이름 추출하는 함수
def extract_model_name(file_name):
    match = re.search(r"training_log_(.+)\.txt", file_name)
    return match.group(1) if match else "Unknown Model"

# Fine-tuned 모델 불러오는 함수
def load_fine_tuned_model(model_name, num_classes, checkpoint_path, device):
    try:
        if model_name == 'resnet50':
            model = models.resnet50(pretrained=True)
            model.fc = nn.Linear(model.fc.in_features, num_classes)
        elif model_name == 'resnet152':
            model = models.resnet152(pretrained=True)
            model.fc = nn.Linear(model.fc.in_features, num_classes)
        elif model_name == 'vgg16_bn':
            model = models.vgg16_bn(pretrained=True)
            model.classifier[6] = nn.Linear(model.classifier[6].in_features, num_classes)
        elif model_name == 'densenet201':
            model = models.densenet201(pretrained=True)
            model.classifier = nn.Linear(model.classifier.in_features, num_classes)
        elif model_name == 'seresnext101':
            model = models.resnext101_32x8d(pretrained=True)  # SE-ResNeXt는 torchvision에 없음 → 유사한 구조로 대체
            model.fc = nn.Linear(model.fc.in_features, num_classes)
        elif model_name == 'convnext_base':
            model = models.convnext_base(pretrained=True)
            model.classifier[2] = nn.Linear(model.classifier[2].in_features, num_classes)
        else:
            raise ValueError(f"Unsupported model: {model_name}")

        model.load_state_dict(torch.load(checkpoint_path, map_location=device))
        model.to(device)
        return model

    except Exception as e:
        print(f"⚠️ 모델 {model_name} 로드 중 오류 발생: {e}")
        return None  # 오류 발생 시 None 반환

# Confusion Matrix 시각화 및 저장
def plot_confusion_matrix(y_true, y_pred, classes, model_name, save_path):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix ({model_name})')
    plt.savefig(save_path)
    plt.close()

# 모델 성능 평가 및 Confusion Matrix 저장
def evaluate_model(model_name, dir_path, dataloader, device, num_classes):
    # Root 디렉토리 설정
    train_root = "/content/drive/MyDrive/KTB/personal_mission/train_dir"
    test_root = "/content/drive/MyDrive/KTB/personal_mission/test_dir"

    checkpoint_path = os.path.join(dir_path, f"best_model_{model_name}.pth")
    model = load_fine_tuned_model(model_name, num_classes, checkpoint_path, device)
    if model != None:
        model.eval()
        all_preds, all_labels = [], []

        with torch.no_grad():
            for images, labels in dataloader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # 데이터셋의 클래스 리스트 가져오기
        root_dir = 'almond/dataset'
        classes = os.listdir(root_dir)

        # 평가 지표 계산
        accuracy = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted')
        recall = recall_score(all_labels, all_preds, average='weighted')
        f1 = f1_score(all_labels, all_preds, average='weighted')

        print(f"Test Accuracy: {accuracy:.4f}")
        print(f"Test Precision: {precision:.4f}")
        print(f"Test Recall: {recall:.4f}")
        print(f"Test F1 Score: {f1:.4f}")

        # 저장 경로 변환 (train_dir -> test_dir)
        save_path = dir_path.replace(train_root, test_root)
        os.makedirs(save_path, exist_ok=True)
        cm_save_path = os.path.join(save_path, f"confusion_matrix_{model_name}.png")

        plot_confusion_matrix(all_labels, all_preds, classes, model_name, cm_save_path)

        # 평가지표 저장
        metrics_save_path = os.path.join(save_path, f"metrics_{model_name}.txt")
        with open(metrics_save_path, "w") as f:
            f.write(f"Model: {model_name}\n")
            f.write(f"Test Accuracy: {accuracy:.4f}\n")
            f.write(f"Test Precision: {precision:.4f}\n")
            f.write(f"Test Recall: {recall:.4f}\n")
            f.write(f"Test F1 Score: {f1:.4f}\n")

        print(f"📄 {metrics_save_path}에 평가지표 저장 완료!")

# 학습 로그 + Confusion Matrix 통합 저장
def process_single_experiment(dataloader, device, num_classes, experiment_dir):
    """
    특정 실험 폴더(예: /content/drive/MyDrive/KTB/personal_mission2/train_dir/1)에서만
    학습 로그 및 Confusion Matrix를 저장하는 함수
    """

    # train_dir에서 test_dir로 변환
    test_experiment_dir = experiment_dir.replace("train_dir", "test_dir")

    # experiment_dir 존재 여부 확인
    if not os.path.exists(experiment_dir):
        print(f"⚠️ 유효하지 않은 실험 디렉토리: {experiment_dir}")
        return

    # 해당 실험 폴더 안의 training_log 파일 찾기
    txt_files = [os.path.join(experiment_dir, file) for file in os.listdir(experiment_dir) if file.startswith("training_log_") and file.endswith(".txt")]

    if not txt_files:
        print(f"❌ {experiment_dir}에서 training_log 파일을 찾을 수 없습니다.")
        return

    os.makedirs(test_experiment_dir, exist_ok=True)  # test_dir에 해당 실험 폴더 생성

    for txt_file in txt_files:
        df = parse_log(txt_file)
        if df is not None:
            model_name = extract_model_name(os.path.basename(txt_file))

            # 학습 로그 그래프 저장
            plt.figure(figsize=(12, 5))

            # Loss 그래프
            plt.subplot(1, 2, 1)
            plt.plot(df["Epoch"], df["Train Loss"], label="Train Loss", marker="o")
            plt.plot(df["Epoch"], df["Val Loss"], label="Val Loss", marker="s")
            plt.xlabel("Epoch")
            plt.ylabel("Loss")
            plt.title(f"Training & Validation Loss ({model_name})")
            plt.legend()
            plt.grid(True)

            # Accuracy 그래프
            plt.subplot(1, 2, 2)
            plt.plot(df["Epoch"], df["Train Acc"], label="Train Acc", marker="o")
            plt.plot(df["Epoch"], df["Val Acc"], label="Val Acc", marker="s")
            plt.xlabel("Epoch")
            plt.ylabel("Accuracy")
            plt.title(f"Training & Validation Accuracy ({model_name})")
            plt.legend()
            plt.grid(True)

            plt.tight_layout()
            plt.savefig(os.path.join(test_experiment_dir, f"training_log_{model_name}.png"))
            plt.close()

            # Confusion Matrix 및 성능 지표 저장
            evaluate_model(model_name, experiment_dir, dataloader, device, num_classes)

    print(f"✅ {experiment_dir}의 학습 로그 및 Confusion Matrix가 {test_experiment_dir}에 저장되었습니다.")



**Cross Validation K-Fold(DB 연동)**

In [None]:
# CV K-Fold DB Table, DB Update 함수

import sqlite3
from datetime import datetime

def create_experiment_db(db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db"):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # 1. experiments: 전체 실험 정보
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS experiments (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        model_name TEXT,
        total_epochs INTEGER,
        k_fold INTEGER,
        lr REAL,
        patience INTEGER,
        created_at TEXT
    )
    """)

    # 2. experiment_folds: 각 fold별 정보
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS experiment_folds (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        experiment_id INTEGER,
        fold INTEGER,
        train_size INTEGER,
        val_size INTEGER,
        best_model_path TEXT,
        early_stopped_epoch INTEGER,
        log_plot_path TEXT,
        FOREIGN KEY (experiment_id) REFERENCES experiments (id)
    )
    """)

    # 3. experiment_logs: 학습 로그 (에폭별)
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS experiment_logs (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        experiment_id INTEGER,
        fold INTEGER,
        epoch INTEGER,
        train_loss REAL,
        val_loss REAL,
        train_acc REAL,
        val_acc REAL,
        FOREIGN KEY (experiment_id) REFERENCES experiments (id)
    )
    """)

    # 4. experiment_tests: 테스트 성능 기록
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS experiment_tests (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        experiment_id INTEGER,
        fold INTEGER,
        is_best_fold INTEGER DEFAULT 0,
        is_ensemble INTEGER DEFAULT 0,
        test_acc REAL,
        test_precision REAL,
        test_recall REAL,
        test_f1 REAL,
        conf_matrix_path TEXT,
        FOREIGN KEY (experiment_id) REFERENCES experiments (id)
    )
    """)

    conn.commit()
    conn.close()
    print("✅ SQLite DB 및 테이블 생성 완료")


def insert_experiment_with_folds(db_path, model_name, total_epochs, k_fold, lr, patience, fold_data_sizes):
    """
    fold_data_sizes: List of (train_size, val_size) for each fold
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    created_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # 1. experiments 테이블에 등록
    cursor.execute("""
        INSERT INTO experiments (
            model_name, total_epochs, k_fold, lr, patience, created_at
        ) VALUES (?, ?, ?, ?, ?, ?)
    """, (model_name, total_epochs, k_fold, lr, patience, created_at))

    experiment_id = cursor.lastrowid

    # 2. experiment_folds 테이블에 k개 fold 등록
    fold_id_list = []

    for i, (train_size, val_size) in enumerate(fold_data_sizes):
        cursor.execute("""
            INSERT INTO experiment_folds (
                experiment_id, fold, train_size, val_size
            ) VALUES (?, ?, ?, ?)
        """, (experiment_id, i+1, train_size, val_size))
        fold_id_list.append(cursor.lastrowid)

    conn.commit()
    conn.close()

    print(f"✅ 실험 등록 완료 (experiment_id: {experiment_id})")
    return experiment_id, fold_id_list


def insert_epoch_log(db_path, experiment_id, fold, epoch, train_loss, val_loss, train_acc, val_acc):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("""
        INSERT INTO experiment_logs (
            experiment_id, fold, epoch, train_loss, val_loss, train_acc, val_acc
        ) VALUES (?, ?, ?, ?, ?, ?, ?)
    """, (experiment_id, fold, epoch, train_loss, val_loss, train_acc, val_acc))

    conn.commit()
    conn.close()


def update_fold_info(db_path, experiment_id, fold, best_model_path=None, early_stopped_epoch=None, log_plot_path=None):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    if best_model_path:
        cursor.execute("""
            UPDATE experiment_folds
            SET best_model_path = ?
            WHERE experiment_id = ? AND fold = ?
        """, (best_model_path, experiment_id, fold))

    if early_stopped_epoch:
        cursor.execute("""
            UPDATE experiment_folds
            SET early_stopped_epoch = ?
            WHERE experiment_id = ? AND fold = ?
        """, (early_stopped_epoch, experiment_id, fold))

    if log_plot_path:
        cursor.execute("""
            UPDATE experiment_folds
            SET log_plot_path = ?
            WHERE experiment_id = ? AND fold = ?
        """, (log_plot_path, experiment_id, fold))

    conn.commit()
    conn.close()


def insert_test_result(
    db_path,
    experiment_id,
    fold,
    is_best_fold,
    is_ensemble,
    test_acc,
    test_precision,
    test_recall,
    test_f1,
    conf_matrix_path
):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("""
        INSERT INTO experiment_tests (
            experiment_id, fold, is_best_fold, is_ensemble,
            test_acc, test_precision, test_recall, test_f1, conf_matrix_path
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        experiment_id, fold, is_best_fold, is_ensemble,
        test_acc, test_precision, test_recall, test_f1, conf_matrix_path
    ))

    conn.commit()
    conn.close()


def plot_log_from_db(db_path, experiment_id, fold, model_name, save_path):
    conn = sqlite3.connect(db_path)

    # 로그 조회
    df = pd.read_sql_query("""
        SELECT epoch, train_loss, val_loss, train_acc, val_acc
        FROM experiment_logs
        WHERE experiment_id = ? AND fold = ?
        ORDER BY epoch
    """, conn, params=(experiment_id, fold))

    conn.close()

    if df.empty:
        print(f"❌ 로그 데이터가 존재하지 않습니다. (experiment_id={experiment_id}, fold={fold})")
        return

    # 그래프 그리기
    plt.figure(figsize=(12, 5))

    # Loss 그래프
    plt.subplot(1, 2, 1)
    plt.plot(df['epoch'], df['train_loss'], label='Train Loss', marker='o')
    plt.plot(df['epoch'], df['val_loss'], label='Val Loss', marker='s')
    plt.title(f"Loss Curve - {model_name} (Fold {fold})")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend()
    plt.grid(True)

    # Accuracy 그래프
    plt.subplot(1, 2, 2)
    plt.plot(df['epoch'], df['train_acc'], label='Train Acc', marker='o')
    plt.plot(df['epoch'], df['val_acc'], label='Val Acc', marker='s')
    plt.title(f"Accuracy Curve - {model_name} (Fold {fold})")
    plt.xlabel("Epoch")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.grid(True)

    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()
    print(f"📈 Fold {fold} 학습 그래프 저장 완료: {save_path}")


def get_best_fold_path(db_path, experiment_id):
    import sqlite3
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # 1. 각 fold별 평균 val_loss 계산
    cursor.execute("""
        SELECT fold, AVG(val_loss) as avg_val_loss
        FROM experiment_logs
        WHERE experiment_id = ?
        GROUP BY fold
        ORDER BY avg_val_loss ASC
        LIMIT 1
    """, (experiment_id,))
    result = cursor.fetchone()

    if not result:
        return None, None

    best_fold = result[0]

    # 2. best_model_path 조회
    cursor.execute("""
        SELECT best_model_path FROM experiment_folds
        WHERE experiment_id = ? AND fold = ?
    """, (experiment_id, best_fold))
    model_path_result = cursor.fetchone()

    conn.close()

    if model_path_result:
        return best_fold, model_path_result[0]
    else:
        return best_fold, None


In [None]:
# Cross Validation K-Fold 전용 dataloader

import torch

from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split

def set_dataloader():
    # 라벨 목록
    labels = os.listdir(root_dir)
    label_to_index = {label: idx for idx, label in enumerate(labels)}
    num_classes = len(labels)

    # 전체 데이터 리스트 생성 (폴더별로 일정 비율 유지)
    data_list = {label: [] for label in labels}
    for label in labels:
        data_dir = os.path.join(root_dir, label)
        datas = os.listdir(data_dir)
        for img_name in datas:
            img_path = os.path.join(data_dir, img_name)
            data_list[label].append((img_path, label_to_index[label]))

    # 데이터셋 분할 (폴더별로 유지)
    train_data, test_data = [], []
    for label, items in data_list.items():
        random.shuffle(items)
        total_size = len(items)

        train_data.extend(items[:6])
        test_data.extend(items[6:])

    # 데이터셋 클래스 정의
    class CustomImageDataset(Dataset):
        def __init__(self, data, transform=None):
            self.data = data
            self.transform = transform

        def __len__(self):
            return len(self.data)

        def __getitem__(self, idx):
            img_path, label = self.data[idx]
            img = cv2.imread(img_path)
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

            if self.transform:
                img = self.transform(img)

            return img, label

    # 데이터 변환 정의
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),  # 이미지 크기 조정
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # 데이터셋 생성
    train_dataset = CustomImageDataset(train_data, transform=transform)
    test_dataset = CustomImageDataset(test_data, transform=transform)

    batch_size = 4

    # 데이터로더 생성
    dataloader_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    dataloader_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

    # 데이터셋 크기 출력
    print(f"Training Dataset Size: {len(train_dataset)}")
    print(f"Test Dataset Size: {len(test_dataset)}")

    # 데이터 확인
    for images, labels in dataloader_train:
        print(f"Train Batch Image Shape: {images.shape}")
        print(f"Train Batch Labels: {labels}")
        break

    return train_dataset, dataloader_train, dataloader_test, num_classes

In [None]:
# Cross Validation K-Fold 학습 코드

from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm
import torch.optim as optim
import os
import torch.nn as nn
import torch

def train_model_cv_kfold(
    model_name, dataset, num_classes, save_dir, device,
    k=5, epochs=10, lr=0.0001, patience=3,
    db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db"
):
    # Stratified K-Fold 준비
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    targets = [label for _, label in dataset]  # label만 추출

    # fold별 데이터 사이즈 파악
    fold_data_sizes = []
    fold_indices = []
    for train_idx, val_idx in skf.split(dataset, targets):
        fold_data_sizes.append((len(train_idx), len(val_idx)))
        fold_indices.append((train_idx, val_idx))

    # ✅ 실험 등록 및 fold ID 초기화
    experiment_id, _ = insert_experiment_with_folds(
        db_path=db_path,
        model_name=model_name,
        total_epochs=epochs,
        k_fold=k,
        lr=lr,
        patience=patience,
        fold_data_sizes=fold_data_sizes
    )

    # fold별 모델, 옵티마이저, 데이터로더 초기화
    fold_models, fold_optimizers = [], []
    fold_train_loaders, fold_val_loaders = [], []

    for i, (train_idx, val_idx) in enumerate(fold_indices):
        train_subset = torch.utils.data.Subset(dataset, train_idx)
        val_subset = torch.utils.data.Subset(dataset, val_idx)

        print(f"📊 Fold {i+1} - Train Size: {len(train_subset)}, Validation Size: {len(val_idx)}")

        model = get_model(model_name, num_classes).to(device)
        optimizer = optim.Adam(model.parameters(), lr=lr)

        train_loader = torch.utils.data.DataLoader(train_subset, batch_size=4, shuffle=True)
        val_loader = torch.utils.data.DataLoader(val_subset, batch_size=4, shuffle=False)

        fold_models.append(model)
        fold_optimizers.append(optimizer)
        fold_train_loaders.append(train_loader)
        fold_val_loaders.append(val_loader)

    # 학습 루프
    best_avg_val_loss = float("inf")
    no_improve_count = 0

    for epoch in tqdm(range(epochs), desc=f"[{model_name} CV-KFold]"):
        fold_val_losses, fold_val_accs = [], []

        for fold in range(k):
            model = fold_models[fold]
            optimizer = fold_optimizers[fold]
            train_loader = fold_train_loaders[fold]
            val_loader = fold_val_loaders[fold]

            # 학습
            train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, device)

            # 검증
            val_loss, val_acc = validate_model(model, val_loader, device, nn.CrossEntropyLoss())

            # ✅ 로그 저장
            insert_epoch_log(db_path, experiment_id, fold+1, epoch+1, train_loss, val_loss, train_acc, val_acc)

            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

        # Cross-Fold 평균으로 early stop 감시
        avg_val_loss = sum(fold_val_losses) / k

        if avg_val_loss < best_avg_val_loss:
            best_avg_val_loss = avg_val_loss
            no_improve_count = 0

            # ✅ fold별 모델 저장
            for fold in range(k):
                model_path = os.path.join(save_dir, f"best_model_{model_name}_fold{fold+1}.pth")
                torch.save(fold_models[fold].state_dict(), model_path)

        else:
            no_improve_count += 1

        if no_improve_count >= patience:
            print(f"🛑 Early stopping at epoch {epoch+1}")
            break

    # ✅ fold별 학습 종료 후 정보 업데이트
    for fold in range(k):
        model_path = os.path.join(save_dir, f"best_model_{model_name}_fold{fold+1}.pth")
        log_plot_path = os.path.join(save_dir, f"training_plot_{model_name}_fold{fold+1}.png")

        # 그래프 저장
        plot_log_from_db(
            db_path=db_path,
            experiment_id=experiment_id,
            fold=fold + 1,  # ✅ 숫자형 fold index
            model_name=model_name,
            save_path=log_plot_path
        )

        # fold 정보 업데이트
        update_fold_info(
            db_path,
            experiment_id,
            fold=fold+1,
            best_model_path=model_path,
            early_stopped_epoch=epoch+1,
            log_plot_path=log_plot_path
        )

    print(f"\n✅ {model_name} 모델의 CV K-Fold 학습 완료 (experiment_id: {experiment_id})")
    return experiment_id

def train_one_epoch(model, dataloader, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    criterion = nn.CrossEntropyLoss()

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    avg_loss = running_loss / len(dataloader)
    accuracy = correct / total
    return avg_loss, accuracy

def validate_model(model, dataloader, device, criterion):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = total_loss / len(dataloader)
    accuracy = correct / total
    return avg_loss, accuracy

In [None]:
# 평가 함수

import seaborn as sns
import torch.nn as nn
import torchvision.models as models

from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

# Fine-tuned 모델 불러오는 함수
def load_fine_tuned_model(model_name, num_classes, checkpoint_path, device):
    try:
        if model_name == 'resnet50':
            model = models.resnet50(pretrained=True)
            model.fc = nn.Linear(model.fc.in_features, num_classes)
        elif model_name == 'resnet152':
            model = models.resnet152(pretrained=True)
            model.fc = nn.Linear(model.fc.in_features, num_classes)
        elif model_name == 'vgg16_bn':
            model = models.vgg16_bn(pretrained=True)
            model.classifier[6] = nn.Linear(model.classifier[6].in_features, num_classes)
        elif model_name == 'densenet201':
            model = models.densenet201(pretrained=True)
            model.classifier = nn.Linear(model.classifier.in_features, num_classes)
        elif model_name == 'seresnext101':
            model = models.resnext101_32x8d(pretrained=True)  # SE-ResNeXt는 torchvision에 없음 → 유사한 구조로 대체
            model.fc = nn.Linear(model.fc.in_features, num_classes)
        elif model_name == 'convnext_base':
            model = models.convnext_base(pretrained=True)
            model.classifier[2] = nn.Linear(model.classifier[2].in_features, num_classes)

        model.load_state_dict(torch.load(checkpoint_path, map_location=device))
        model.to(device)
        return model

    except Exception as e:
        print(f"⚠️ 모델 {model_name} 로드 중 오류 발생: {e}")
        return None  # 오류 발생 시 None 반환

# Confusion Matrix 시각화 및 저장
def plot_confusion_matrix(y_true, y_pred, classes, model_name, save_path):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=classes, yticklabels=classes)
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    plt.title(f'Confusion Matrix ({model_name})')
    plt.savefig(save_path)
    plt.close()

def evaluate_model_and_log(
    experiment_id,
    model_name,
    test_loader,
    num_classes,
    device,
    save_dir,
    fold=None,
    is_best_fold=0,
    is_ensemble=0,
    db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db",
    class_dir='almond/dataset'
):
    try:
        best_fold, model_path = get_best_fold_path(db_path, experiment_id)
        fold = best_fold

        # 모델 불러오기
        model = load_fine_tuned_model(model_name, num_classes, model_path, device)
        if model is None:
            print(f"❌ 모델 {model_path} 로드 실패")
            return

        model.eval()
        all_preds, all_labels = [], []

        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # 클래스명 (Confusion Matrix용)
        class_names = os.listdir(class_dir)

        # 지표 계산
        acc = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted')
        recall = recall_score(all_labels, all_preds, average='weighted')
        f1 = f1_score(all_labels, all_preds, average='weighted')

        print(f"✅ 평가 완료 - Acc: {acc:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}")

        # Confusion Matrix 시각화
        cm = confusion_matrix(all_labels, all_preds)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')
        plt.title(f'Confusion Matrix ({model_name})')

        # 저장 경로 생성 및 저장
        os.makedirs(save_dir, exist_ok=True)
        suffix = "ensemble" if is_ensemble else (f"best_fold{fold}" if is_best_fold else f"fold{fold}")
        cm_path = os.path.join(save_dir, f"conf_matrix_{model_name}_{suffix}.png")
        plt.savefig(cm_path)
        plt.close()

        # ✅ DB 저장
        insert_test_result(
            db_path=db_path,
            experiment_id=experiment_id,
            fold=fold if fold is not None else 0,
            is_best_fold=is_best_fold,
            is_ensemble=is_ensemble,
            test_acc=acc,
            test_precision=precision,
            test_recall=recall,
            test_f1=f1,
            conf_matrix_path=cm_path
        )

    except Exception as e:
        print(f"❌ 평가 중 오류 발생: {e}")

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# 각 fold 평가
def evaluate_fold_models(
    experiment_id,
    model_name,
    k,
    save_dir,
    test_loader,
    num_classes,
    device,
    db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db"
):
    best_acc = 0
    best_fold = None

    for fold in range(1, k+1):  # fold 1~5
        model_path = os.path.join(save_dir, f"best_model_{model_name}_fold{fold}.pth")
        model = load_fine_tuned_model(model_name, num_classes, model_path, device)
        if model is None:
            continue

        model.eval()
        all_preds, all_labels = [], []

        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        acc = accuracy_score(all_labels, all_preds)
        precision = precision_score(all_labels, all_preds, average='weighted')
        recall = recall_score(all_labels, all_preds, average='weighted')
        f1 = f1_score(all_labels, all_preds, average='weighted')

        if acc > best_acc:
            best_acc = acc
            best_fold = fold

        # 클래스명 (Confusion Matrix용)
        class_dir='almond/dataset'
        class_names = os.listdir(class_dir)

        # Confusion matrix 저장
        cm = confusion_matrix(all_labels, all_preds)
        plt.figure(figsize=(8, 6))
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
        plt.title(f'Confusion Matrix - Fold {fold}')
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')

        cm_path = os.path.join(save_dir, f"conf_matrix_{model_name}_fold{fold}.png")
        plt.savefig(cm_path)
        plt.close()

        # ✅ DB 저장
        insert_test_result(
            db_path=db_path,
            experiment_id=experiment_id,
            fold=fold,
            is_best_fold=0,
            is_ensemble=0,
            test_acc=acc,
            test_precision=precision,
            test_recall=recall,
            test_f1=f1,
            conf_matrix_path=cm_path
        )

    return best_fold

# Val loss 기준 best fold 모델 평가
def evaluate_best_fold_model(
    experiment_id,
    model_name,
    save_dir,
    test_loader,
    num_classes,
    device,
    db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db"
):
    best_fold, model_path = get_best_fold_path(db_path, experiment_id)
    model = load_fine_tuned_model(model_name, num_classes, model_path, device)
    if model is None:
        return

    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    acc = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    # 클래스명 (Confusion Matrix용)
    class_dir='almond/dataset'
    class_names = os.listdir(class_dir)

    # Confusion matrix 저장
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title(f'Best Fold Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')


    cm_path = os.path.join(save_dir, f"conf_matrix_{model_name}_best_fold{best_fold}.png")
    plt.savefig(cm_path)
    plt.close()

    # ✅ DB 저장
    insert_test_result(
        db_path=db_path,
        experiment_id=experiment_id,
        fold=best_fold,
        is_best_fold=1,
        is_ensemble=0,
        test_acc=acc,
        test_precision=precision,
        test_recall=recall,
        test_f1=f1,
        conf_matrix_path=cm_path
    )

import numpy as np

# 앙상블 모델 평가
def evaluate_ensemble_model(
    experiment_id,
    model_name,
    k,
    save_dir,
    test_loader,
    num_classes,
    device,
    db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db"
):
    models_list = []
    for fold in range(1, k+1):
        model_path = os.path.join(save_dir, f"best_model_{model_name}_fold{fold}.pth")
        model = load_fine_tuned_model(model_name, num_classes, model_path, device)
        if model:
            model.eval()
            models_list.append(model)

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.cpu().numpy()
            outputs_sum = None

            for model in models_list:
                outputs = model(images)
                probs = torch.softmax(outputs, dim=1).cpu().numpy()
                outputs_sum = probs if outputs_sum is None else outputs_sum + probs

            avg_probs = outputs_sum / len(models_list)
            preds = np.argmax(avg_probs, axis=1)

            all_preds.extend(preds)
            all_labels.extend(labels)

    acc = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    recall = recall_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')

    # 클래스명 (Confusion Matrix용)
    class_dir='almond/dataset'
    class_names = os.listdir(class_dir)

    # Confusion matrix 저장
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.title(f'Ensemble Confusion Matrix')
    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')

    cm_path = os.path.join(save_dir, f"conf_matrix_{model_name}_ensemble.png")
    plt.savefig(cm_path)
    plt.close()

    # ✅ DB 저장
    insert_test_result(
        db_path=db_path,
        experiment_id=experiment_id,
        fold=0,  # 앙상블은 fold 없음
        is_best_fold=0,
        is_ensemble=1,
        test_acc=acc,
        test_precision=precision,
        test_recall=recall,
        test_f1=f1,
        conf_matrix_path=cm_path
    )



In [None]:
# Fine-tuning
dataloader_train, dataloader_val, dataloader_test, num_classes = set_dataloader()

save_dir = create_next_directory('/content/drive/MyDrive/KTB/personal_mission/train_dir')
for model_name in ['resnet50', 'resnet152', 'vgg16_bn', 'densenet201', 'seresnext101', 'convnext_base']:
    print(f"Training {model_name}...")
    train_model(model_name, dataloader_train, dataloader_val, num_classes, save_dir, device, epochs=200, lr=0.00005, patience=10)
process_single_experiment(dataloader_test, device, num_classes, save_dir)


# Augmentation + Fine-tuning
dataloader_train, dataloader_val, dataloader_test, num_classes = set_aug_dataloader()

save_dir = create_next_directory('/content/drive/MyDrive/KTB/personal_mission/aug_train_dir')
for model_name in ['resnet50', 'resnet152', 'vgg16_bn', 'densenet201', 'seresnext101', 'convnext_base']:
    print(f"Training {model_name}...")
    train_model(model_name, dataloader_train, dataloader_val, num_classes, save_dir, device, epochs=200, lr=0.00005, patience=10)
process_single_experiment(dataloader_test, device, num_classes, save_dir)

# C-V K-Fold + Fine-tuning
train_dataset, dataloader_train, dataloader_test, num_classes = set_dataloader()

save_dir = create_next_directory('/content/drive/MyDrive/KTB/personal_mission/k-fold_aug_train_dir')
for model_name in ['resnet50', 'resnet152', 'vgg16_bn', 'densenet201', 'seresnext101', 'convnext_base']:
    print(f"Training {model_name}...")
    experiment_id =  train_model_cv_kfold(model_name, train_dataset, num_classes, save_dir, device, k=3, epochs=200, lr=0.00005, patience=10)
    print(f"Testing {model_name}...")
    evaluate_model_and_log(experiment_id, model_name, dataloader_test, num_classes, device, save_dir)
    evaluate_fold_models(experiment_id, model_name, 3, save_dir, dataloader_test, num_classes, device, db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db")
    evaluate_best_fold_model(experiment_id, model_name, save_dir, dataloader_test, num_classes, device, db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db")
    evaluate_ensemble_model(experiment_id, model_name, 3, save_dir, dataloader_test, num_classes, device, db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db")

**Data Augmentation + Cross Validation K-Fold**

In [None]:
# Augmentation 포함 Dataset 클래스

from PIL import Image
from torchvision.transforms.functional import rotate

class RotatedAugmentedDataset(Dataset):
    def __init__(self, original_dataset, indices):
        self.original_dataset = original_dataset
        self.indices = indices
        self.angles = [0, 90, 180, 270]

    def __len__(self):
        return len(self.indices) * 4

    def __getitem__(self, idx):
        img_idx = idx // 4
        angle_idx = idx % 4

        image = self.original_dataset[img_idx][0]
        label = self.original_dataset[img_idx][1]

        rotated_image = rotate(image, self.angles[idx % 4])

        return rotated_image, label

In [None]:
# Augmentation + Cross Validation K-Fold 학습 코드

from sklearn.model_selection import StratifiedKFold
from tqdm import tqdm
import torch.optim as optim
import os
import torch.nn as nn
import torch

def train_model_cv_kfold(
    model_name, dataset, num_classes, save_dir, device,
    k=5, epochs=10, lr=0.0001, patience=3,
    db_path="/content/drive/MyDrive/KTB/personal_mission/experiment_log.db"
):
    # Stratified K-Fold 준비
    skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=42)
    targets = [label for _, label in dataset]  # label만 추출

    # fold별 데이터 사이즈 파악
    fold_data_sizes = []
    fold_indices = []
    for train_idx, val_idx in skf.split(dataset, targets):
        fold_data_sizes.append((len(train_idx), len(val_idx)))
        fold_indices.append((train_idx, val_idx))

    # ✅ 실험 등록 및 fold ID 초기화
    experiment_id, _ = insert_experiment_with_folds(
        db_path=db_path,
        model_name=model_name,
        total_epochs=epochs,
        k_fold=k,
        lr=lr,
        patience=patience,
        fold_data_sizes=fold_data_sizes
    )

    # fold별 모델, 옵티마이저, 데이터로더 초기화
    fold_models, fold_optimizers = [], []
    fold_train_loaders, fold_val_loaders = [], []

    for i, (train_idx, val_idx) in enumerate(fold_indices):
        augmented_train_dataset = RotatedAugmentedDataset(dataset, train_idx)
        val_subset = torch.utils.data.Subset(dataset, val_idx)

        print(f"📊 Fold {i+1} - Augmented Train Dataset Size: {len(augmented_train_dataset)} (Original: {len(train_idx)}), Validation Size: {len(val_idx)}")

        show_augmented_images(augmented_train_dataset)

        model = get_model(model_name, num_classes).to(device)
        optimizer = optim.Adam(model.parameters(), lr=lr)

        train_loader = torch.utils.data.DataLoader(augmented_train_dataset, batch_size=4, shuffle=True)
        val_loader = torch.utils.data.DataLoader(val_subset, batch_size=4, shuffle=False)

        fold_models.append(model)
        fold_optimizers.append(optimizer)
        fold_train_loaders.append(train_loader)
        fold_val_loaders.append(val_loader)

    # 학습 루프
    best_avg_val_loss = float("inf")
    no_improve_count = 0

    for epoch in tqdm(range(epochs), desc=f"[{model_name} CV-KFold]"):
        fold_val_losses, fold_val_accs = [], []

        for fold in range(k):
            model = fold_models[fold]
            optimizer = fold_optimizers[fold]
            train_loader = fold_train_loaders[fold]
            val_loader = fold_val_loaders[fold]

            # 학습
            train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, device)

            # 검증
            val_loss, val_acc = validate_model(model, val_loader, device, nn.CrossEntropyLoss())

            # ✅ 로그 저장
            insert_epoch_log(db_path, experiment_id, fold+1, epoch+1, train_loss, val_loss, train_acc, val_acc)

            fold_val_losses.append(val_loss)
            fold_val_accs.append(val_acc)

        # Cross-Fold 평균으로 early stop 감시
        avg_val_loss = sum(fold_val_losses) / k

        if avg_val_loss < best_avg_val_loss:
            best_avg_val_loss = avg_val_loss
            no_improve_count = 0

            # ✅ fold별 모델 저장
            for fold in range(k):
                model_path = os.path.join(save_dir, f"best_model_{model_name}_fold{fold+1}.pth")
                torch.save(fold_models[fold].state_dict(), model_path)

        else:
            no_improve_count += 1

        if no_improve_count >= patience:
            print(f"🛑 Early stopping at epoch {epoch+1}")
            break

    # ✅ fold별 학습 종료 후 정보 업데이트
    for fold in range(k):
        model_path = os.path.join(save_dir, f"best_model_{model_name}_fold{fold+1}.pth")
        log_plot_path = os.path.join(save_dir, f"training_plot_{model_name}_fold{fold+1}.png")

        # 그래프 저장
        plot_log_from_db(
            db_path=db_path,
            experiment_id=experiment_id,
            fold=fold + 1,  # ✅ 숫자형 fold index
            model_name=model_name,
            save_path=log_plot_path
        )

        # fold 정보 업데이트
        update_fold_info(
            db_path,
            experiment_id,
            fold=fold+1,
            best_model_path=model_path,
            early_stopped_epoch=epoch+1,
            log_plot_path=log_plot_path
        )

    print(f"\n✅ {model_name} 모델의 CV K-Fold 학습 완료 (experiment_id: {experiment_id})")
    return experiment_id
