<a href="https://colab.research.google.com/github/bisil2/AI1_Final_Project/blob/main/%EC%9D%B8%EA%B3%B5%EC%A7%80%EB%8A%A51_%EB%AA%A8%EB%8D%B8_%ED%95%99%EC%8A%B5(Inception_v4_%EC%82%AC%EC%A0%84%ED%95%99%EC%8A%B5).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 02) 정상/감염 이진분류 모델 학습(ResNet50)

In [None]:
use_background = 1
use_agumentation = 1
use_dropout = 1
use_freezing = 1
use_l2 = 0

## 라이브러리 설치 및 Import

In [None]:
''' 패키지 설치 '''
!pip install torch torchvision

# =============================
# torch : 모델 실행, 학습, 추론에 필수적인 PyTorch 프레임워크
# torchvision : 이미지 처리 관련 도구 제공
# =============================



In [None]:
'''Google Drive 연동'''
from google.colab import drive
drive.mount('/content/drive')

'''필수 라이브러리 import'''
import pandas as pd
import numpy as np
import shutil
import zipfile
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from sklearn.metrics import f1_score
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import torch.nn.functional as F

Mounted at /content/drive


## 데이터 불러오기

In [None]:
'''Label 데이터 불러오기'''
rawdata = pd.read_csv("/content/drive/MyDrive/data/rawdata.csv")

In [None]:
'''Image 데이터 불러오기'''
if use_background:
  zipName = 'data'
else:
  zipName = 'raw'

# data 폴더 생성
targetPath = '/content/data/'
os.makedirs(targetPath, exist_ok=True)

# 압축 파일 content로 복사 => content에 있으면 처리속도가 비교적 빠름
rootZip = f'/content/drive/MyDrive/data/{zipName}.zip'
targetZip = f'/content/{zipName}.zip'
shutil.copyfile(rootZip, targetZip)

# zipfile.ZipFile로 압축 파일을 열고, 압축된 모든 파일을 targetPath로 이동(압축 해제)
with zipfile.ZipFile(targetZip, 'r') as zip_ref:
  zip_ref.extractall(targetPath)

## 데이터 Split

In [None]:
''' Train:Val:Test = 6:2:2 분할'''
# rawdata를 분할. train:(val+test) = 6:4
train, temp = train_test_split(rawdata, test_size=0.4, random_state=1)

# rawdata를 분할. val:test = 5:5
val, test = train_test_split(temp, test_size=0.5, random_state=1)

## 데이터셋 클래스, Transform 생성

In [None]:
''' DataSet 클래스 정의 '''
# =============================
# 목적 : DataFrame에 저장된 이미지 경로와 라벨 등을 불러오고, 전처리 후 (image, label) 리턴
# 매개변수(?)
#  - dataframe : 이미지 파일 이름/클래스 등이 포함된 변수
#  - rootDir : 이미지들이 저장된 경로
#  - transform : torchvision.transforms를 사용한 이미지 전처리 파이프라인(전처리 묶음? 정도로 이해하면 될 듯)
# =============================
class CustomDataset(Dataset):
  def __init__(self, dataframe, rootDir, transform=None):
    self.dataframe = dataframe
    self.rootDir = rootDir
    self.transform = transform

  # 데이터셋의 총 샘플 수 반환
  def __len__(self):
    return len(self.dataframe)

  # idx번째 데이터 리턴
  def __getitem__(self, idx):
    # DataFrame의 idx번째 행을 row에 저장
    row = self.dataframe.iloc[idx]
    # 이미지 경로 불러오기(row['image']는 파일명이므로, 상위 경로와 더해줌; 파일 이름이 '.JPG'로 된 경우도 있어서 통일)
    imgName = os.path.join(self.rootDir, row['image'].replace('.JPG', '.jpg'))

    # 이미지 파일을 열고, RGB로 변환
    image = Image.open(imgName).convert('RGB')

    # trasform이 정의되어 있다면, 전처리 적용
    if self.transform:
        image = self.transform(image)

    # 라벨 저장
    label = row['class']

    # (전처리된 이미지, 라벨) 리턴
    return image, label

In [None]:
'''Transform 정의'''
# train, val, test 용도에 따라 전처리를 다르게 적용
if use_agumentation:
  transform = {
      'train': transforms.Compose([
          # 이미지를 224x224로 통일 (ResNet50 입력 크기)
          transforms.Resize((224, 224)),
          # 확률적으로 이미지를 좌우 반전 (데이터 증강)
          transforms.RandomHorizontalFlip(),
          # -15 ~ +15도 사이로 랜덤 회전 (데이터 증강)
          transforms.RandomRotation(15),
          # 밝기, 대비, 채도 랜덤 조절 (데이터 증강)
          transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
          # 10 % 비율로 좌우 또는 상화 랜덤 이동 (데이터 증강)
          transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
          # PIL 이미지를 Tensor 형식으로 변환 (0~255 >> 0~1 실수형)
          transforms.ToTensor(),
          # 평균 및 표준편차를 기준으로 정규화 (Image Net으로 사전 학습도니 모델과 일치시키기 위해)
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ]),
      'val': transforms.Compose([
          # 이미지를 224x224로 통일 (ResNet50 입력 크기)
          transforms.Resize((224, 224)),
          # PIL 이미지를 Tensor 형식으로 변환 (0~255 >> 0~1 실수형)
          transforms.ToTensor(),
          # 평균 및 표준편차를 기준으로 정규화 (Image Net으로사전 학습도니 모델과 일치시키기 위해)
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ]),
      'test': transforms.Compose([ # val과 동일/ train과 달리 평가 용도기 때문에 val과 test에는 데이터 증강이 없음.
          transforms.Resize((224, 224)),
          transforms.ToTensor(),
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ])
  }
else:
  transform = {
      'train': transforms.Compose([
          # 이미지를 224x224로 통일 (ResNet50 입력 크기)
          transforms.Resize((224, 224)),
          # PIL 이미지를 Tensor 형식으로 변환 (0~255 >> 0~1 실수형)
          transforms.ToTensor(),
          # 평균 및 표준편차를 기준으로 정규화 (Image Net으로 사전 학습도니 모델과 일치시키기 위해)
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ]),
      'val': transforms.Compose([
          # 이미지를 224x224로 통일 (ResNet50 입력 크기)
          transforms.Resize((224, 224)),
          # PIL 이미지를 Tensor 형식으로 변환 (0~255 >> 0~1 실수형)
          transforms.ToTensor(),
          # 평균 및 표준편차를 기준으로 정규화 (Image Net으로사전 학습도니 모델과 일치시키기 위해)
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ]),
      'test': transforms.Compose([ # val과 동일/ train과 달리 평가 용도기 때문에 val과 test에는 데이터 증강이 없음.
          transforms.Resize((224, 224)),
          transforms.ToTensor(),
          transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ])
  }

In [None]:
# Dataset 불러오기
trainDataset = CustomDataset(train, '/content/data', transform=transform['train'])
valDataset = CustomDataset(val, '/content/data', transform=transform['val'])
testDataset = CustomDataset(test, '/content/data', transform=transform['test'])

# DataLoader 불러오기
trainLoader = DataLoader(trainDataset, batch_size=32, shuffle=False)
valLoader = DataLoader(valDataset, batch_size=32, shuffle=False)
testLoader = DataLoader(testDataset, batch_size=32, shuffle=False)

## 학습, 테스트 메소드 정의

In [None]:
''' ResNet50 모델 생성 및 정의 '''

# 사용할 디바이스 설정 : GPU(cuda)가 가능하면 GPU, 아니면 CPU 사용
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# =============================
# 목적 : 모델을 불러오고 재정의
# =============================
def create_model():
    # 사전 학습된 ResNet50 모델 불러오기
    # ImageNet으로 학습된 가중치를 불러와서 학습 시간을 줄이고 성능 향상을 기대할 수 있음.
    model = models.resnet50(pretrained=True)

    # 모델의 일부 레이어를 freeze : 특정 레이어의 가중치를 고정하여 학습 제외
    # Optimal Layer Selection on Deep Convolutional Neural Networks Using Backward Freezing and Binary Search에서 51개 레이어를 동결하는게 가장 효율이 좋더라~
    # 이유는 데이터가 작기 때문에 과적합을 방지하기 위함 >> 전반부는 묶어두고, 후반부의 레이어만 학습시키기 위해서
    all_params = list(model.named_parameters())

    if use_freezing:
      # 앞의 51개 파라미터는 동결 (requires_grad=False)
      for name, param in all_params[:51]:
          param.requires_grad = False

      # 나머지는 학습 대상
      for name, param in all_params[51:]:
          param.requires_grad = True
    else:
      for name, param in all_params:
          param.requires_grad = True

    # 기존의 fully connected layer는 제거하고, 새로운 분류기로 대체
    # 기존 fc 레이어의 feature 수 불러오기
    num_features = model.fc.in_features
    # 새로운 fc 레이어 정의 (Sequantial)
    if use_dropout:
      model.fc = nn.Sequential(
          # 기존 특성 수 >> 256 차원으로 축소
          nn.Linear(num_features, 256),
          # 활성화 함수
          nn.ReLU(),
          # 일부 뉴런을 50% 확률로 0으로 설정 >> 너무 과적합되면 일반화에 방해가 되기 때문에
          nn.Dropout(0.5),
          # 256 -> 2 차원으로 축소 (2개 클래스 분류를 위한 출력층)
          nn.Linear(256, 2),
      )
    else:
      model.fc = nn.Sequential(
          # 기존 특성 수 >> 256 차원으로 축소
          nn.Linear(num_features, 256),
          # 활성화 함수
          nn.ReLU(),
          # 256 -> 2 차원으로 축소 (2개 클래스 분류를 위한 출력층)
          nn.Linear(256, 2),
      )

    # 모델을 지정한 디바이스로 이동
    return model.to(device)

In [None]:
''' 학습 모델 '''
# =============================
# 목적 : 모델 학습
# 1단계 : 0~51 layer 동결 학습
# 2단계 : 전체 layer 학습
# Early Stopping : val loss가 6번 이상 개선이 안될 경우, 종료
# =============================
def TrainModel(model, criterion, optimizer, scheduler, trainLoader, valLoader, numEpochs=12, patience=6):
    # Early Stopping 관련 변수
    # loss 기준값을 최댓값으로 설정
    best_val_loss = float('inf')
    # early stopping counter 0으로 설정
    counter = 0
    # best model 저장 경로 설정
    best_model_path = f"/content/drive/MyDrive/model/{use_background}{use_agumentation}{use_dropout}{use_freezing}{use_l2}_best_model.pth"

    # Backbone 동결 상태에서 fc layer만 학습
    print("Stage 1: Training fc layer only (Backbone frozen)")
    for epoch in range(numEpochs//2):
        # Train
        # 모델을 학습 모드로 설정
        model.train()
        runningLoss = 0.0
        runningCorrects = 0

        # 훈련 데이터 반복 학습
        for inputs, labels in trainLoader:
            # 이미지와 라벨(클래스)
            inputs, labels = inputs.to(device), labels.to(device)

            # 기존 그래디언트 초기화
            optimizer.zero_grad()
            # 모델 forward pass
            outputs = model(inputs)
            # 손실함수 계산 (출력, 정답)
            loss = criterion(outputs, labels)
            # 역전파 학습
            loss.backward()
            # 파라미터 업데이트
            optimizer.step()

            # 예측값 계산
            _, preds = torch.max(outputs, 1)
            # 손실 합산
            runningLoss += loss.item() * inputs.size(0)
            # 정확도 합산
            runningCorrects += torch.sum(preds == labels.data)

        # 반복 횟수 단위로 Train 평균 손실 및 정확도 계산
        epochLoss = runningLoss / len(trainLoader.dataset)
        epochAcc = runningCorrects.double() / len(trainLoader.dataset)
        print(f"Epoch {epoch+1}/{numEpochs} - Train Loss: {epochLoss:.4f} Acc: {epochAcc:.4f}")

        # Validation
        # 모델을 평가 모드로 설정
        model.eval()
        valLoss = 0.0
        valCorrects = 0

        # 그래디언트 비활성화
        with torch.no_grad():
            for inputs, labels in valLoader:
                # 이미지와 라벨(클래스)
                inputs, labels = inputs.to(device), labels.to(device)
                # 모델 forward pass
                outputs = model(inputs)
                # 손실함수 계산 (출력, 정답)
                loss = criterion(outputs, labels)

                # 예측값 계산
                _, preds = torch.max(outputs, 1)
                # 손실 합산
                valLoss += loss.item() * inputs.size(0)
                # 정확도 합산
                valCorrects += torch.sum(preds == labels.data)

        # 반복 횟수 단위로 Validation 평균 손실 및 정확도 계산
        valLoss /= len(valLoader.dataset)
        val_acc = valCorrects.double() / len(valLoader.dataset)
        print(f"Validation Loss: {valLoss:.4f} Acc: {val_acc:.4f}")

        # Scheduler: Validation Loss 기반으로 학습률 조정
        scheduler.step(valLoss)

        # Early Stopping
        # 개선이 되면
        if valLoss < best_val_loss:
            best_val_loss = valLoss
            counter = 0
            torch.save(model, best_model_path)
            print(f"Saved best model with Val Loss: {best_val_loss:.4f}")
        # 개선이 안되면
        else:
            counter += 1
            print(f"No improvement in {counter}/{patience} epochs")
            if counter >= patience:
                print("Early stopping triggered in Stage 1")
                break

    # Backbone 동결 해제 후 전체 모델 Fine-tuning
    print("\nStage 2: Fine-tuning entire model")
    # Backbone 동결 해제
    for param in model.parameters():
        param.requires_grad = True

    # 옵티마이저의 학습률을 더 작게 설정
    if use_l2:
      optimizer = optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-4)
    else:
      optimizer = optim.Adam(model.parameters(), lr=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

    # 나머지 epoch 동안 학습 (내용은 동일)
    for epoch in range(numEpochs//2, numEpochs):
        # Train
        model.train()
        runningLoss = 0.0
        runningCorrects = 0

        for inputs, labels in trainLoader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            runningLoss += loss.item() * inputs.size(0)
            runningCorrects += torch.sum(preds == labels.data)

        epochLoss = runningLoss / len(trainLoader.dataset)
        epochAcc = runningCorrects.double() / len(trainLoader.dataset)
        print(f"Epoch {epoch+1}/{numEpochs} - Train Loss: {epochLoss:.4f} Acc: {epochAcc:.4f}")

        # Validation
        model.eval()
        valLoss = 0.0
        valCorrects = 0

        with torch.no_grad():
            for inputs, labels in valLoader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, labels)

                _, preds = torch.max(outputs, 1)
                valLoss += loss.item() * inputs.size(0)
                valCorrects += torch.sum(preds == labels.data)

        valLoss /= len(valLoader.dataset)
        val_acc = valCorrects.double() / len(valLoader.dataset)
        print(f"Validation Loss: {valLoss:.4f} Acc: {val_acc:.4f}")

        scheduler.step(valLoss)

        if valLoss < best_val_loss:
            best_val_loss = valLoss
            counter = 0
            torch.save(model, best_model_path)
            print(f"Saved best model with Val Loss: {best_val_loss:.4f}")
        else:
            counter += 1
            print(f"No improvement in {counter}/{patience} epochs")
            if counter >= patience:
                print("Early stopping triggered in Stage 2")
                break

    print(f"Training completed. Best model saved at: {best_model_path}")

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score, roc_auc_score, confusion_matrix
import numpy as np
import torch.nn.functional as F

def TestModel(model, testLoader):
    all_preds = []
    all_probs = []  # AUC 계산용 softmax 확률
    all_labels = []

    testCorrects = 0
    testLoss = 0.0

    criterion = nn.CrossEntropyLoss()
    model.eval()

    with torch.no_grad():
        for inputs, labels in testLoader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # 예측 클래스
            _, preds = torch.max(outputs, 1)
            # softmax 확률 (클래스 1의 확률만)
            probs = F.softmax(outputs, dim=1)[:, 1]

            testLoss += loss.item() * inputs.size(0)
            testCorrects += torch.sum(preds == labels.data)

            all_preds.extend(preds.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # 평균 계산
    testLoss /= len(testLoader.dataset)
    testAcc = testCorrects.double() / len(testLoader.dataset)
    f1 = f1_score(all_labels, all_preds, average='macro')
    precision = precision_score(all_labels, all_preds, average='macro')
    recall = recall_score(all_labels, all_preds, average='macro')
    auc = roc_auc_score(all_labels, all_probs)

    print(f"Test Accuracy  : {testAcc:.4f}")
    print(f"Test Loss      : {testLoss:.4f}")
    print(f"ROC-AUC        : {auc:.4f}")
    print(f"Precision      : {precision:.4f}")
    print(f"Recall         : {recall:.4f}")
    print(f"F1-macro       : {f1:.4f}")

    return testAcc.item(), testLoss, auc, precision, recall, f1

## 모델 학습 및 테스트

In [None]:
# 모델 불러오기
model = create_model()

# CrossEntropyLoss로 손실 함수 설정
criterion = nn.CrossEntropyLoss()
# Adam으로 옵티마이저로 사용 (filter를 통해 require_grad = True, 동결되지 않은 것만 업데이트)
# weight_decay = 1e-4 : L2 정규화 적용으로 과적합 방지
if use_l2:
  optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001, weight_decay=1e-4)
else:
  optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001)
# 합습률 스케줄러 설정
# 검증 손실이 줄어들지 않으면 learning rate 줄임 (수렴 및 안정화)
# min 모드 : 손실이 최소화되지 않으면 작동 / factor : 학습률을 x배 줄임 / patience : x번 학습동안 개선되지 않으면 발동 / verbose : 메시지 출력
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)

# Train 모델 실행
TrainModel(model, criterion, optimizer, scheduler, trainLoader, valLoader, numEpochs=20, patience=5)



Stage 1: Training fc layer only (Backbone frozen)
Epoch 1/20 - Train Loss: 0.3291 Acc: 0.8618
Validation Loss: 0.3527 Acc: 0.9109
Saved best model with Val Loss: 0.3527
Epoch 2/20 - Train Loss: 0.2106 Acc: 0.9244
Validation Loss: 0.1155 Acc: 0.9457
Saved best model with Val Loss: 0.1155
Epoch 3/20 - Train Loss: 0.1480 Acc: 0.9444
Validation Loss: 0.2237 Acc: 0.9167
No improvement in 1/5 epochs
Epoch 4/20 - Train Loss: 0.1253 Acc: 0.9574
Validation Loss: 0.1666 Acc: 0.9360
No improvement in 2/5 epochs
Epoch 5/20 - Train Loss: 0.1186 Acc: 0.9535
Validation Loss: 0.0845 Acc: 0.9767
Saved best model with Val Loss: 0.0845
Epoch 6/20 - Train Loss: 0.0961 Acc: 0.9638
Validation Loss: 0.0823 Acc: 0.9709
Saved best model with Val Loss: 0.0823
Epoch 7/20 - Train Loss: 0.0924 Acc: 0.9677
Validation Loss: 0.0938 Acc: 0.9787
No improvement in 1/5 epochs
Epoch 8/20 - Train Loss: 0.0745 Acc: 0.9761
Validation Loss: 0.1179 Acc: 0.9671
No improvement in 2/5 epochs
Epoch 9/20 - Train Loss: 0.0739 Acc: 0

In [None]:
print(f"{use_background}{use_agumentation}{use_dropout}{use_freezing}{use_l2}_best_model.pth")
best_model_path = f"/content/drive/MyDrive/model/{use_background}{use_agumentation}{use_dropout}{use_freezing}{use_l2}_best_model.pth"
model = torch.load(best_model_path, weights_only=False)
model.to(device)

# 테스트
result = TestModel(model, testLoader)

11110_best_model.pth
Test Accuracy  : 0.9845
Test Loss      : 0.0450
ROC-AUC        : 0.9985
Precision      : 0.9845
Recall         : 0.9846
F1-macro       : 0.9845
