<a href="https://colab.research.google.com/github/SurinSeong/FinalPJT/blob/main/densenet201_vgg19_chin_sagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# train, val, eval 구성 변경
* train : 증강이미지만 (train) 2490개
* test(val + eval) : 원본이미지만 5:5 분할해서 val, eval로 나누기 965개씩

* 추가
    * weight_decay, scheduler(cosine annealing 추가)

1. 1st
        lr = 1e-4, batch_size = 16 (train), weight_decay = 1e-5
        (0.0878) ensemble_chin_0828
        Evaluation Accuracy: 95.34% (batch=16)
                    precision    recall  f1-score   support

                0       0.97      0.96      0.96       645
                1       0.92      0.94      0.93       320

         accuracy                           0.95       965
        macro avg       0.95      0.95      0.95       965
        weighted avg    0.95      0.95      0.95       965

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset, ConcatDataset
from torchvision.models import DenseNet201_Weights, VGG19_Weights
from torch.optim.lr_scheduler import CosineAnnealingLR

import pandas as pd
from PIL import Image
import cv2

import numpy as np
import time
import os

# 초기 가중치 설정

In [None]:
import numpy as np
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 클래스의 샘플 수
chin_sagging_class_counts = np.array([632, 333])

# 전체 샘플 수
total_samples = 965

# 클래스 비율에 기반한 가중치 계산
chin_sagging_class_weights = total_samples / (len(chin_sagging_class_counts) * chin_sagging_class_counts)

# 가중치를 텐서로 변환
chin_sagging_class_weights = torch.tensor(chin_sagging_class_weights, dtype=torch.float32).to(device)

print(chin_sagging_class_weights)

tensor([0.7634, 1.4489])


# 모델 구축

In [None]:
class DenseNet201_VGG19_Ensemble(nn.Module):
    def __init__(self, num_classes):
        super(DenseNet201_VGG19_Ensemble, self).__init__()

        # DenseNet201 정의
        self.densenet = models.densenet201(weights=DenseNet201_Weights.DEFAULT)
        densenet_features = self.densenet.classifier.in_features
        self.densenet.classifier = nn.Identity() # 최종 분류기 제거

        # VGG19 정의
        self.vgg19 = models.vgg19(weights=VGG19_Weights.DEFAULT)
        vgg19_features = self.vgg19.classifier[0].in_features
        self.vgg19.classifier = nn.Identity() # 최종 분류기 제거

        # 두 모델의 특징을 결합하는 계층
        self.classifier = nn.Sequential(
            nn.Linear(densenet_features + vgg19_features, 1024),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        # DenseNet201 특징 추출
        densenet_features = self.densenet(x)
        # VGG19 특징 추출
        vgg19_features = self.vgg19(x)
        # 두 특징 결합
        combined_features = torch.cat((densenet_features, vgg19_features), dim=1)

        # 최종 분류
        output = self.classifier(combined_features)
        return output

In [None]:
####################
# 모델 학습 클래스 #
####################

## 모델 훈련 클래스 ##
class ModelTrainer:
    def __init__(self, model, train_loader, val_loader, criterion, optimizer, scheduler, device):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.scheduler = scheduler
        self.device = device

    def train_and_val(self, num_epochs, save_dir, today):
        early_stopping = EarlyStopping(patience=10, verbose=True)

        self.model.train()

        valid_loss_min = np.inf

        for epoch in range(num_epochs):
            start = time.time()

            train_loss = 0.0
            train_correct = 0
            train_total = 0

            ## Train Mode ##
            for images, labels in self.train_loader:
                images, labels = images.to(self.device), labels.to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(images)
                _, train_preds = torch.max(outputs, 1) # 예측 클래스 얻기
                train_total += labels.size(0)
                train_correct += (train_preds == labels).sum().item()

                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()

                train_loss += loss.item()

            train_accuracy = train_correct / train_total * 100

            self.scheduler.step()
            current_lr = self.scheduler.get_last_lr()[0]

            # loss 값, accuracy 값 출력
            print(f'Epoch [{epoch+1}/{num_epochs}], Time: {float(time.time()-start):.1f}s, Learning rate: {current_lr}\nTraining Loss: {train_loss/len(self.train_loader):.4f}, Accuracy: {train_accuracy:.2f}%')

            ## Validation Mode ##
            self.model.eval()

            valid_correct = 0
            valid_total = 0
            valid_loss = 0.0

            with torch.no_grad():
                for images, labels in self.val_loader:
                    images, labels = images.to(self.device), labels.to(self.device)

                    outputs = self.model(images)
                    _, preds = torch.max(outputs, 1) # 예측 클래스 얻기
                    valid_total += labels.size(0)
                    valid_correct += (preds == labels).sum().item()

                    loss = self.criterion(outputs, labels)
                    valid_loss += loss.item()

            valid_loss /= len(self.val_loader)
            valid_accuracy = valid_correct / valid_total * 100

            # loss 값 출력
            print(f"Validation Loss: {valid_loss:.4f}, Accuracy: {valid_accuracy:.2f}%")

            # Early stopping 체크
            early_stopping(valid_loss)

            if early_stopping.early_stop:
                print('Early stopping')
                break

            # 모델 저장
            if valid_loss <= valid_loss_min:
                print(f'Validation loss decreased ({valid_loss_min:.4f} --> {valid_loss:.4f}). Saving model...')
                torch.save(model.state_dict(), f'{save_dir}ensemble_chin_{today}.pt')
                valid_loss_min = valid_loss

####################
# 모델 평가 클래스 #
####################

# 모델 평가
class ModelEvaluator:
    def __init__(self, best_model_state_dict, eval_loader, criterion, device):
        self.best_model_state_dict = best_model_state_dict
        self.eval_loader = eval_loader
        self.criterion = criterion
        self.device = device

    def evaluate(self):
        model = DenseNet201_VGG19_Ensemble(num_classes)
        model.load_state_dict(self.best_model_state_dict)

        model.to(self.device).eval()

        eval_correct = 0
        eval_total = 0

        eval_true = []
        eval_pred = []

        with torch.no_grad():
            for images, labels in self.eval_loader:
                images, labels = images.to(self.device), labels.to(self.device)

                outputs = model(images)
                _, eval_preds = torch.max(outputs, 1)

                eval_true.extend(labels.cpu().numpy())
                eval_pred.extend(eval_preds.cpu().numpy())

                eval_total += labels.size(0)
                eval_correct += (eval_preds == labels).sum().item()

        eval_accuracy = eval_correct / eval_total * 100

        print(f"Evaluation Accuracy: {eval_accuracy:.2f}%")

        return eval_pred, eval_true

#############
# 조기 종료 #
#############

class EarlyStopping:
    def __init__(self, patience=5, verbose=False, delta=0):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.delta = delta

    def __call__(self, val_loss):
        score = -val_loss

        if self.best_score is None:
            self.best_score = score

        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0


In [None]:
##########################
# 데이터셋 및 데이터로더 #
##########################

## 데이터셋 정의 ##
class AnnotationDataset(Dataset):
    def __init__(self, image_dirs, csv_file, annotation, transform=None):

        if csv_file is not None and image_dirs is not None:
            # CSV 파일 로드 및 레이블 설정
            self.image_paths = []
            self.labels = []
            self.transform = transform

            data = pd.read_csv(csv_file)

            for image_dir in image_dirs:
                for image_file in os.listdir(image_dir):
                    if image_file.endswith('.jpg'):
                        image_path = os.path.join(image_dir, image_file)
                        image_id = image_file.split('_')[0]
                        label_data = data[data['ID'] == int(image_id)][annotation].values
                        if len(label_data) > 0:
                            label = label_data[0]
                            self.image_paths.append(image_path)
                            self.labels.append(label)

            # 넘파이 배열로 변경
            self.image_paths = np.array(self.image_paths)
            self.labels = np.array(self.labels)

        else:
            raise ValueError('Both csv file and image folders must be provided.')

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
# 이미지 전처리 파이프라인
transform = transforms.Compose([
    transforms.Resize((224, 224)), # ResNet50의 입력 크기에 맞게 조정
    transforms.ToTensor(),  # 텐서로 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])

In [None]:
## PATH ##
data_dir = '/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/'

csv_file = data_dir + 'data/annotation/annotation_class2.csv'

## train PATH ##
train_image_dirs = [os.path.join(data_dir + 'data/image/Orientation/train/chin', folder) for folder in os.listdir(data_dir + 'data/image/Orientation/train/chin') if not (folder.startswith('.') or folder in ['chin_origin', 'smart_pad'])]

## test PATH ##
val_image_dirs = [data_dir + 'data/image/Orientation/train/chin/chin_origin',
                  data_dir + 'data/image/Orientation/val/chin/chin_origin',
                  data_dir + 'data/image/Orientation/train/chin/smart_pad',
                  data_dir + 'data/image/Orientation/val/chin/smart_pad']

## save PATH ##
save_dir = data_dir + '수린님/ensemble/model/'

In [None]:
train_image_dirs

['/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/data/image/Orientation/train/chin/horizon',
 '/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/data/image/Orientation/train/chin/color_minus_10',
 '/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/data/image/Orientation/train/chin/color_plus_10',
 '/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/data/image/Orientation/train/chin/rotation_plus_10',
 '/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/data/image/Orientation/train/chin/rotation_minus_10',
 '/content/drive/MyDrive/Final_project_2조/02_2. 전처리 및 EDA_이미지/data/image/Orientation/train/chin/background']

In [None]:
from sklearn.model_selection import train_test_split

## train 데이터셋 준비 ##
train_dataset = AnnotationDataset(train_image_dirs, csv_file, annotation='chin_sagging', transform=transform)

## valid / test 데이터셋 준비 ##
test_dataset = AnnotationDataset(val_image_dirs, csv_file, annotation='chin_sagging', transform=transform)

# valid / eval 데이터셋 나누기
indices = np.arange(len(test_dataset))
valid_indices, eval_indices = train_test_split(indices, test_size=0.5, random_state=42)

valid_subset = torch.utils.data.Subset(test_dataset, valid_indices)
eval_subset = torch.utils.data.Subset(test_dataset, eval_indices)

# 개수 확인
len(train_dataset), len(test_dataset), len(valid_subset), len(eval_subset)

(3086, 1930, 965, 965)

In [None]:
#####################
# 모델 훈련 및 평가 #
#####################

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

num_classes = 2
lr = 1e-4
batch_size = 16
weight_decay = 1e-5
num_epochs = 50
today = '0828'

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_subset, batch_size=batch_size, shuffle=False)

model = DenseNet201_VGG19_Ensemble(num_classes=num_classes)

criterion = nn.CrossEntropyLoss(weight=chin_sagging_class_weights)  # 손실 함수 + 초기 가중치 설정
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay) # 최적화
scheduler = scheduler = CosineAnnealingLR(optimizer, T_max=20, eta_min=1e-6)

trainer = ModelTrainer(model, train_loader, valid_loader, criterion, optimizer, scheduler, device)
trainer.train_and_val(num_epochs=num_epochs, save_dir=save_dir, today=today)


Downloading: "https://download.pytorch.org/models/densenet201-c1103571.pth" to /root/.cache/torch/hub/checkpoints/densenet201-c1103571.pth
100%|██████████| 77.4M/77.4M [00:00<00:00, 158MB/s]
Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to /root/.cache/torch/hub/checkpoints/vgg19-dcbb9e9d.pth
100%|██████████| 548M/548M [00:06<00:00, 84.8MB/s]


Epoch [1/50], Time: 1067.2s, Learning rate: 9.939057285945933e-05
Training Loss: 0.4028, Accuracy: 80.98%
Validation Loss: 0.1949, Accuracy: 91.81%
Validation loss decreased (inf --> 0.1949). Saving model...
Epoch [2/50], Time: 132.9s, Learning rate: 9.757729755661012e-05
Training Loss: 0.4969, Accuracy: 77.54%
Validation Loss: 0.3087, Accuracy: 89.64%
EarlyStopping counter: 1 out of 10
Epoch [3/50], Time: 132.5s, Learning rate: 9.460482294732422e-05
Training Loss: 0.2727, Accuracy: 89.24%
Validation Loss: 0.1812, Accuracy: 93.06%
Validation loss decreased (0.1949 --> 0.1812). Saving model...
Epoch [4/50], Time: 133.2s, Learning rate: 9.054634122155992e-05
Training Loss: 0.1843, Accuracy: 92.77%
Validation Loss: 0.1497, Accuracy: 94.09%
Validation loss decreased (0.1812 --> 0.1497). Saving model...
Epoch [5/50], Time: 133.6s, Learning rate: 8.550178566873411e-05
Training Loss: 0.1526, Accuracy: 93.71%
Validation Loss: 0.2004, Accuracy: 91.50%
EarlyStopping counter: 1 out of 10
Epoch [6

In [None]:
# 평가
num_classes = 2
lr = 1e-4
today = '0828'

criterion = nn.CrossEntropyLoss(weight=chin_sagging_class_weights)

best_model_state_dict = torch.load(f'{save_dir}ensemble_chin_{today}.pt', map_location=device)

eval_loader = DataLoader(eval_subset, batch_size=1, shuffle=False)
evaluator = ModelEvaluator(best_model_state_dict, eval_loader, criterion, device)
eval_pred, eval_true = evaluator.evaluate()

from sklearn.metrics import classification_report

# 평가 지표 계산
report = classification_report(eval_true, eval_pred)

print(report)

  best_model_state_dict = torch.load(f'{save_dir}ensemble_chin_{today}.pt', map_location=device)


Evaluation Accuracy: 95.34%
              precision    recall  f1-score   support

           0       0.97      0.96      0.96       645
           1       0.92      0.94      0.93       320

    accuracy                           0.95       965
   macro avg       0.95      0.95      0.95       965
weighted avg       0.95      0.95      0.95       965

