In [2]:
import os
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
from efficientnet_pytorch import EfficientNet
import albumentations as A
from albumentations.pytorch import ToTensorV2
from PIL import Image
from tqdm import tqdm
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
data_dir = "./data" 

INFO:albumentations.check_version:A new version of Albumentations is available: 1.4.11 (you have 1.4.7). Upgrade using: pip install --upgrade albumentations


In [4]:
# 하이퍼파라미터 설정
batch_size = 32
num_epochs = 10
learning_rate = 0.001
num_classes = 3  # figure, table, trash

class CustomDataset(Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        image = np.array(image)
        if self.transform:
            image = self.transform(image=image)['image']
        return image, label


train_transform = A.Compose(
    [
        A.Resize(256, 256),
        A.HorizontalFlip(p=0.5),  
        A.VerticalFlip(p=0.5),    
        A.RandomRotate90(p=0.5),  
        A.Normalize(),
        ToTensorV2()
    ]
)

val_transform = A.Compose(
    [
        A.Resize(256, 256),
        A.Normalize(),
        ToTensorV2()
    ]
)

test_transform = A.Compose(
    [
        A.Resize(256, 256),
        A.Normalize(),
        ToTensorV2()
    ]
)

# 데이터셋 로드
image_datasets = { 
    'train': CustomDataset(datasets.ImageFolder(os.path.join(data_dir, 'train')), transform=train_transform),
    'val': CustomDataset(datasets.ImageFolder(os.path.join(data_dir, 'val')), transform=val_transform),
    'test': CustomDataset(datasets.ImageFolder(os.path.join(data_dir, 'test')), transform=test_transform)
}

dataloaders = {x: DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4)
               for x in ['train', 'val', 'test']}

In [9]:
# EfficientNet 모델 불러오기 및 수정
model = EfficientNet.from_pretrained('efficientnet-b4')
num_ftrs = model._fc.in_features
model._fc = nn.Linear(num_ftrs, num_classes)
model = model.to(device)

# 손실 함수 및 옵티마이저 설정
#2259: 2361: 1004
class_weights = torch.tensor([1.05, 1.00, 2.35], dtype=torch.float32).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 스케줄러 설정 (StepLR 사용)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)

from torchmetrics import F1Score

f1_score = F1Score(num_classes=num_classes, average='weighted', task='multiclass').to(device)

# 학습 및 검증 함수
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_f1 = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            f1_score.reset()

            data_iter = tqdm(dataloaders[phase], desc=f'{phase} Iteration')
            for inputs, labels in data_iter:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                f1_score.update(preds, labels)

            epoch_loss = running_loss / len(image_datasets[phase])
            epoch_f1 = f1_score.compute()

            print(f'{phase} Loss: {epoch_loss:.4f} F1: {epoch_f1:.4f}')

            if phase == 'val' and epoch_f1 > best_f1:
                best_f1 = epoch_f1
                best_model_wts = copy.deepcopy(model.state_dict())

        scheduler.step()

    print(f'Best val F1: {best_f1:.4f}')
    model.load_state_dict(best_model_wts)
    return model
# 모델 학습
model = train_model(model, criterion, optimizer, scheduler, num_epochs=num_epochs)



Loaded pretrained weights for efficientnet-b4
Epoch 0/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.15it/s]


train Loss: 0.1415 F1: 0.9598


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.55it/s]


val Loss: 0.1317 F1: 0.9834
Epoch 1/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.17it/s]


train Loss: 0.0527 F1: 0.9822


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.90it/s]


val Loss: 0.0660 F1: 0.9930
Epoch 2/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.17it/s]


train Loss: 0.0662 F1: 0.9836


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.80it/s]


val Loss: 0.0274 F1: 0.9896
Epoch 3/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.15it/s]


train Loss: 0.0467 F1: 0.9859


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.79it/s]


val Loss: 0.0313 F1: 0.9905
Epoch 4/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.16it/s]


train Loss: 0.0264 F1: 0.9909


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.59it/s]


val Loss: 0.0130 F1: 0.9974
Epoch 5/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.17it/s]


train Loss: 0.0266 F1: 0.9915


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.68it/s]


val Loss: 0.0101 F1: 0.9974
Epoch 6/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.15it/s]


train Loss: 0.0349 F1: 0.9879


val Iteration: 100%|██████████| 37/37 [00:03<00:00, 12.09it/s]


val Loss: 0.0244 F1: 0.9939
Epoch 7/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.17it/s]


train Loss: 0.0489 F1: 0.9845


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.81it/s]


val Loss: 0.0098 F1: 0.9931
Epoch 8/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.16it/s]


train Loss: 0.0295 F1: 0.9904


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.70it/s]


val Loss: 0.0148 F1: 0.9965
Epoch 9/9
----------


train Iteration: 100%|██████████| 176/176 [00:42<00:00,  4.15it/s]


train Loss: 0.0103 F1: 0.9961


val Iteration: 100%|██████████| 37/37 [00:02<00:00, 12.73it/s]

val Loss: 0.0036 F1: 0.9974
Best val F1: 0.9974





In [10]:
# 테스트 데이터셋으로 평가
model.eval()
f1_score.reset()

for inputs, labels in dataloaders['test']:
    inputs = inputs.to(device)
    labels = labels.to(device)
    with torch.set_grad_enabled(False):
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        f1_score.update(preds, labels)

test_f1 = f1_score.compute()
print(f'Test F1: {test_f1:.4f}')
torch.save(model.state_dict(), 'efficientnet_finetuned.pth')

Test F1: 0.9991


In [8]:
import torch
from torchvision import transforms
from PIL import Image

# 모델 로드 함수
def load_model(model_path, device):
    model = EfficientNet.from_pretrained('efficientnet-b4', num_classes=num_classes)
    model._fc = nn.Linear(model._fc.in_features, num_classes)
    model.load_state_dict(torch.load(model_path))
    model = model.to(device)
    model.eval()
    return model

# 이미지 전처리 함수
def preprocess_image(image_path):
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image = Image.open(image_path).convert('RGB')  # RGBA를 RGB로 변환
    image = transform(image).unsqueeze(0)  # 배치 차원 추가
    return image


# 예측 함수
def predict_image(model, image_path, device):
    image_tensor = preprocess_image(image_path)
    image_tensor = image_tensor.to(device)
    
    with torch.no_grad():
        outputs = model(image_tensor)
        _, predicted = torch.max(outputs, 1)
        idx_to_class = {0: 'figure', 1: 'table', 2: 'trash'}  # 인덱스를 클래스 이름으로 매핑
        predicted_label = idx_to_class[predicted.item()]
    
    return predicted_label

# 모델 및 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_path = '/home/a2024712006/corning/corning-figure-classifier/efficientnet_finetuned.pth'
model = load_model(model_path, device)

# 이미지 경로 설정 및 예측
image_path = '/home/a2024712006/image1_3.png'
predicted_label = predict_image(model, image_path, device)
print(f'The predicted label is: {predicted_label}')


Loaded pretrained weights for efficientnet-b4
The predicted label is: figure
