<a href="https://colab.research.google.com/github/Jaeeyun/cv_final_assignment/blob/main/final_cv_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Google Drive 마운트
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
import pandas as pd
from pathlib import Path
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import time
import copy
import os
import cv2 # cv2는 이미지 로드 시 PIL로 대체하거나 keep 가능
from tqdm import tqdm
import time

torch.cuda.empty_cache()

# --- 설정 ---
TRAIN_DATA_DIR = '/content/drive/MyDrive/cv_final/POC_Dataset/Training'
NUM_CLASSES = 4 # 데이터셋의 클래스 수 (0, 1, 2, 3)
BATCH_SIZE = 32
LEARNING_RATE = 0.0001 # 파인튜닝에 적합한 낮은 학습률
NUM_EPOCHS = 10

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 1. DataFrame 로드 함수 (이전과 동일)
def create_dataframe_and_encode_labels(base_dir):
    base_dir = Path(base_dir)
    image_paths = []
    labels = []
    print(f"'{base_dir}' 디렉토리에서 데이터 로딩 중...")
    for class_dir in sorted(base_dir.iterdir()):
        if class_dir.is_dir() and not class_dir.name.startswith('.'):
            label_name = class_dir.name
            for image_file in class_dir.glob('*'):
                if image_file.suffix.lower() in ['.png', '.jpg', '.jpeg', '.tif', '.tiff']:
                    image_paths.append(str(image_file.resolve()))
                    labels.append(label_name)
    df = pd.DataFrame({'image_path': image_paths, 'label_name': labels})
    le = LabelEncoder()
    df['label'] = le.fit_transform(df['label_name'])
    print(f"\n총 {len(df)}개의 이미지 경로와 라벨을 매핑했습니다.")
    print(f"클래스 매핑: {list(le.classes_)}")
    return df

# 2. DataFrame 로드 및 분할
df = create_dataframe_and_encode_labels(base_dir=TRAIN_DATA_DIR)
train_df, val_df = train_test_split(df, test_size=0.3, stratify=df['label'], random_state=42)
train_df = train_df.reset_index(drop=True)
val_df = val_df.reset_index(drop=True)

# 3. 사용자 정의 Dataset 클래스 (PyTorch 및 PIL 사용)
class PathologyDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx]['image_path']
        label = self.dataframe.iloc[idx]['label']

        image = Image.open(img_path).convert('RGB')

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(label, dtype=torch.long)


# 4. PyTorch Transforms 및 DataLoader 설정 (AlexNet 표준)
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),

        transforms.RandomHorizontalFlip(p=0.5), # 수평 뒤집기
        transforms.RandomVerticalFlip(p=0.5),   # 수직 뒤집기
        transforms.RandomRotation(degrees=30),  # -30도에서 +30도 사이 랜덤 회전
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 밝기, 대비 등 랜덤 조절

        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

train_dataset = PathologyDataset(train_df, transform=data_transforms['train'])
val_dataset = PathologyDataset(val_df, transform=data_transforms['val'])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

dataloaders = {'train': train_loader, 'val': val_loader}
dataset_sizes = {'train': len(train_dataset), 'val': len(val_dataset)}


# 5. AlexNet 모델 로드 및 수정
model_ft = models.alexnet(weights=models.AlexNet_Weights.DEFAULT)
num_ftrs = model_ft.classifier[6].in_features # AlexNet 마지막 FC 레이어 입력 특성 수
model_ft.classifier[6] = nn.Linear(num_ftrs, NUM_CLASSES) # 새 출력 레이어로 교체
model_ft = model_ft.to(device)


# 6. 손실 함수, 최적화 도구, 스케줄러 설정
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.Adam(model_ft.parameters(), lr=LEARNING_RATE)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)


def train_model(model, criterion, optimizer, scheduler, num_epochs=NUM_EPOCHS):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            progress_bar = tqdm(dataloaders[phase], desc=f'Epoch {epoch+1} {phase}')

            for inputs, labels in progress_bar:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

                progress_bar.set_postfix(loss=loss.item(), acc=torch.sum(preds == labels.data).item()/inputs.size(0))


            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'학습 완료 시간: {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'최고 검증 정확도: {best_acc:.4f}')
    model.load_state_dict(best_model_wts)
    return model


# 8. 학습 실행
print("\n--- AlexNet 모델 파인튜닝 시작 ---")
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=NUM_EPOCHS)
print("--- 학습 완료 ---")

# 모델 저장
torch.save(model_ft.state_dict(), '/content/drive/MyDrive/cv_final/alexnet_pathology_fine_tuned_model_2.pth')


'/content/drive/MyDrive/cv_final/POC_Dataset/Training' 디렉토리에서 데이터 로딩 중...

총 4167개의 이미지 경로와 라벨을 매핑했습니다.
클래스 매핑: ['Chorionic_villi', 'Decidual_tissue', 'Hemorrhage', 'Trophoblastic_tissue']
Downloading: "https://download.pytorch.org/models/alexnet-owt-7be5be79.pth" to /root/.cache/torch/hub/checkpoints/alexnet-owt-7be5be79.pth


100%|██████████| 233M/233M [00:01<00:00, 159MB/s]



--- AlexNet 모델 파인튜닝 시작 ---
Epoch 1/10
----------


Epoch 1 train: 100%|██████████| 92/92 [07:54<00:00,  5.16s/it, acc=0.25, loss=1.1]


train Loss: 0.5930 Acc: 0.7781


Epoch 1 val: 100%|██████████| 40/40 [03:00<00:00,  4.50s/it, acc=1, loss=0.04]


val Loss: 0.2906 Acc: 0.8793

Epoch 2/10
----------


Epoch 2 train: 100%|██████████| 92/92 [06:23<00:00,  4.16s/it, acc=0.75, loss=0.226]


train Loss: 0.3809 Acc: 0.8549


Epoch 2 val: 100%|██████████| 40/40 [00:50<00:00,  1.26s/it, acc=1, loss=0.0151]


val Loss: 0.3117 Acc: 0.8777

Epoch 3/10
----------


Epoch 3 train: 100%|██████████| 92/92 [06:15<00:00,  4.08s/it, acc=1, loss=0.0105]


train Loss: 0.3350 Acc: 0.8765


Epoch 3 val: 100%|██████████| 40/40 [00:49<00:00,  1.24s/it, acc=1, loss=0.0159]


val Loss: 0.2791 Acc: 0.8897

Epoch 4/10
----------


Epoch 4 train: 100%|██████████| 92/92 [06:15<00:00,  4.09s/it, acc=0.75, loss=0.45]


train Loss: 0.2888 Acc: 0.8916


Epoch 4 val: 100%|██████████| 40/40 [00:50<00:00,  1.27s/it, acc=1, loss=0.0239]


val Loss: 0.2438 Acc: 0.9121

Epoch 5/10
----------


Epoch 5 train: 100%|██████████| 92/92 [06:06<00:00,  3.99s/it, acc=1, loss=0.163]


train Loss: 0.2964 Acc: 0.8796


Epoch 5 val: 100%|██████████| 40/40 [00:51<00:00,  1.28s/it, acc=1, loss=0.0468]


val Loss: 0.2320 Acc: 0.9161

Epoch 6/10
----------


Epoch 6 train: 100%|██████████| 92/92 [06:09<00:00,  4.02s/it, acc=1, loss=0.0316]


train Loss: 0.2688 Acc: 0.8999


Epoch 6 val: 100%|██████████| 40/40 [00:51<00:00,  1.28s/it, acc=1, loss=0.00378]


val Loss: 0.1946 Acc: 0.9185

Epoch 7/10
----------


Epoch 7 train: 100%|██████████| 92/92 [06:10<00:00,  4.02s/it, acc=1, loss=0.0044]


train Loss: 0.2220 Acc: 0.9122


Epoch 7 val: 100%|██████████| 40/40 [00:49<00:00,  1.24s/it, acc=1, loss=0.0171]


val Loss: 0.1940 Acc: 0.9289

Epoch 8/10
----------


Epoch 8 train: 100%|██████████| 92/92 [06:12<00:00,  4.04s/it, acc=1, loss=0.136]


train Loss: 0.1880 Acc: 0.9228


Epoch 8 val: 100%|██████████| 40/40 [00:49<00:00,  1.23s/it, acc=1, loss=0.00526]


val Loss: 0.1672 Acc: 0.9353

Epoch 9/10
----------


Epoch 9 train: 100%|██████████| 92/92 [06:13<00:00,  4.06s/it, acc=1, loss=0.00942]


train Loss: 0.1675 Acc: 0.9314


Epoch 9 val: 100%|██████████| 40/40 [00:51<00:00,  1.28s/it, acc=1, loss=0.00272]


val Loss: 0.1656 Acc: 0.9345

Epoch 10/10
----------


Epoch 10 train: 100%|██████████| 92/92 [06:11<00:00,  4.04s/it, acc=1, loss=0.000546]


train Loss: 0.1550 Acc: 0.9366


Epoch 10 val: 100%|██████████| 40/40 [00:50<00:00,  1.25s/it, acc=1, loss=0.00361]


val Loss: 0.1660 Acc: 0.9329

학습 완료 시간: 74m 28s
최고 검증 정확도: 0.9353
--- 학습 완료 ---


In [None]:
import pandas as pd
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support
import torch
from torch.utils.data import DataLoader
import numpy as np
import time

TEST_DATA_DIR = '/content/drive/MyDrive/cv_final/POC_Dataset/Testing'

# 학습 시 사용한 create_dataframe_and_encode_labels 함수가 메모리에 있다고 가정
# 학습 시 사용한 df (학습+검증 전체 데이터프레임)를 사용하여 레이블 매핑을 일관되게 유지

# 1. 테스트 데이터셋 DataFrame 생성
print(f"\n--- '{TEST_DATA_DIR}' 디렉토리에서 테스트 데이터 로딩 중 ---")
# create_dataframe_and_encode_labels 함수는 LabelEncoder를 사용하므로 일관된 인코딩 보장
test_df = create_dataframe_and_encode_labels(base_dir=TEST_DATA_DIR)

# 2. 테스트 데이터셋 인스턴스 생성
# 학습 시 정의한 data_transforms['val'] (검증용 변환) 사용
test_dataset = PathologyDataset(test_df, transform=data_transforms['val']) # PathologyDataset 클래스 필요

# 3. 테스트 DataLoader 생성
TEST_BATCH_SIZE = 32
test_loader = DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE, shuffle=False, num_workers=2)


# 4. 테스트 함수 정의
def test_model(model, dataloader, device):
    model.eval() # 모델을 평가 모드로 설정
    predictions = []
    true_labels = []

    print("\n--- 테스트 데이터셋에 대한 예측 수행 중 ---")
    start_time = time.time()

    # gradient 계산을 비활성화하여 메모리 절약 및 속도 향상
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            # 가장 높은 확률을 가진 클래스 인덱스 가져오기
            _, preds = torch.max(outputs, 1)

            # 결과를 CPU로 옮기고 리스트에 저장
            predictions.extend(preds.cpu().numpy())
            true_labels.extend(labels.cpu().numpy())

    time_elapsed = time.time() - start_time
    print(f"예측 완료 시간: {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")

    return np.array(true_labels), np.array(predictions)

# 5. 예측 실행
# 학습이 완료된 model_ft 변수 사용
labels, preds = test_model(model_ft, test_loader, device)


# 6. 결과 DataFrame에 추가 및 확인
test_df['predicted_label'] = preds
# 숫자 라벨을 원래 클래스 이름으로 다시 매핑 (학습 데이터의 클래스 매핑 순서를 따름)
# 학습 코드에서 사용한 'df' (전체 학습 데이터프레임) 사용
label_map = dict(zip(df['label'], df['label_name']))
test_df['predicted_label_name'] = test_df['predicted_label'].map(label_map)


print("\n--- 예측 결과 확인 (샘플 5개) ---")
print(test_df[['image_path', 'label_name', 'predicted_label_name']].head())


# 7. 성능 평가 및 리포트 출력
if labels is not None and len(labels) == len(preds):
    print("\n--- 전체 테스트 데이터셋 성능 리포트 ---")

    # 총 예측 개수 계산
    total_count = len(labels)
    # 맞은 개수 계산
    correct_count = (preds == labels).sum()
    # 정확도 계산
    accuracy = accuracy_score(labels, preds)

    print(f"총 이미지 개수: {total_count}개")
    print(f"정답 개수: {correct_count}개")
    print(f"정확도 (Accuracy): {accuracy:.4f}\n")

    # 정밀도, 재현율, F1-스코어 계산
    # target_names 클래스 이름 목록
    target_names = list(df['label_name'].unique())
    print(classification_report(labels, preds, target_names=target_names))
else:
    print("\n테스트 데이터셋에 실제 라벨이 없거나 길이가 맞지 않아 성능 리포트를 생성할 수 없습니다.")



--- '/content/drive/MyDrive/cv_final/POC_Dataset/Testing' 디렉토리에서 테스트 데이터 로딩 중 ---
'/content/drive/MyDrive/cv_final/POC_Dataset/Testing' 디렉토리에서 데이터 로딩 중...

총 1511개의 이미지 경로와 라벨을 매핑했습니다.
클래스 매핑: ['Chorionic_villi', 'Decidual_tissue', 'Hemorrhage', 'Trophoblastic_tissue']

--- 테스트 데이터셋에 대한 예측 수행 중 ---
예측 완료 시간: 3m 47s

--- 예측 결과 확인 (샘플 5개) ---
                                          image_path       label_name  \
0  /content/drive/MyDrive/cv_final/POC_Dataset/Te...  Chorionic_villi   
1  /content/drive/MyDrive/cv_final/POC_Dataset/Te...  Chorionic_villi   
2  /content/drive/MyDrive/cv_final/POC_Dataset/Te...  Chorionic_villi   
3  /content/drive/MyDrive/cv_final/POC_Dataset/Te...  Chorionic_villi   
4  /content/drive/MyDrive/cv_final/POC_Dataset/Te...  Chorionic_villi   

  predicted_label_name  
0      Chorionic_villi  
1      Chorionic_villi  
2      Chorionic_villi  
3      Chorionic_villi  
4      Chorionic_villi  

--- 전체 테스트 데이터셋 성능 리포트 ---
총 이미지 개수: 1511개
정답 개수: 1306개
정확도 (Accur