In [1]:
import os
import cv2
import glob
import sys
import configparser
import numpy as np

import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim

from torch.utils.data.dataset import random_split
from torch.utils.data import DataLoader, Dataset
from torchvision import models, datasets

from PIL import Image
from tqdm import tqdm

In [2]:
model_name = "your_model_name.pth"
data_dir = "./test"
batch_size = 32
num_epochs = 30

In [3]:
# def calculate_norm(dataset):
#     # dataset의 axis=1, 2에 대한 평균 산출
#     mean_ = np.array([np.mean(x.numpy(), axis=(1, 2)) for x, _ in dataset])
#     # r, g, b 채널에 대한 각각의 평균 산출
#     mean_r = mean_[:, 0].mean()
#     mean_g = mean_[:, 1].mean()
#     mean_b = mean_[:, 2].mean()

#     # dataset의 axis=1, 2에 대한 표준편차 산출
#     std_ = np.array([np.std(x.numpy(), axis=(1, 2)) for x, _ in dataset])
#     # r, g, b 채널에 대한 각각의 표준편차 산출
#     std_r = std_[:, 0].mean()
#     std_g = std_[:, 1].mean()
#     std_b = std_[:, 2].mean()
    
#     return (mean_r, mean_g, mean_b), (std_r, std_g, std_b)

In [4]:
# transform = transforms.Compose([
#     transforms.ToTensor(),
# ])

# train_dataset = datasets.ImageFolder(root=data_dir, transform=transform)

In [5]:
# mean_, std_ = calculate_norm(train_dataset)
# mean_ = np.around(mean_, 3)
# std_ = np.around(std_, 3)
# print(f'평균(R,G,B): {mean_}\n표준편차(R,G,B): {std_}')
# transforms_set = {"resize": (384, 384), "normalize": (list(mean_), list(std_))}

In [6]:
mean_ = [0.485, 0.456, 0.406]
std_ = [0.229, 0.224, 0.225]
transforms_set = {"resize": (384, 384), "normalize": (list(mean_), list(std_))}

In [7]:
list(mean_)

[0.485, 0.456, 0.406]

In [8]:
list(std_)

[0.229, 0.224, 0.225]

In [9]:
transform_train = transforms.Compose([
    transforms.Resize((384, 384)),
    transforms.ToTensor(),
    transforms.Normalize(mean=list(mean_), std=list(std_)),
])

In [10]:
train_dataset = datasets.ImageFolder(root=data_dir, transform=transform_train)
idx_to_cls = {v: k for k, v in train_dataset.class_to_idx.items()}

print(f"class : {idx_to_cls}")

class : {0: 'NG', 1: 'OK'}


In [11]:
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

In [12]:
# 검증 데이터셋에도 동일한 변환 적용
val_dataset.dataset.transform = transform_train

In [13]:
# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

In [14]:
model = models.efficientnet_v2_s(weights=None)

class_folder_dir = glob.glob(data_dir + "/*")
num_classes = len(class_folder_dir)  # Change to your number of classes
print(f"number of classes : {num_classes}")
model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

number of classes : 2


In [15]:
model.to(device)

model = nn.DataParallel(model)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

In [16]:
if not os.path.exists("./config"):
    os.makedirs("./config")

config = configparser.ConfigParser()
config.add_section(str(model_name.split(".")[0]))
config.set(str(model_name.split(".")[0]), "model_name", model_name)
config.set(str(model_name.split(".")[0]), "transforms", str(transforms_set))
config.set(str(model_name.split(".")[0]), "idx_to_cls", str(idx_to_cls))
config.set(str(model_name.split(".")[0]), "num_classes", str(num_classes))

with open("./config/config.ini", "a") as f:
    config.write(f)

In [17]:
def train_and_validate_model(train_loader, val_loader, model, criterion, optimizer, num_epochs=25):
    best_acc = 0.0
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0

        with tqdm(train_loader, ncols=160, ascii=" =", unit="batch") as tepoch:
            for inputs, labels in tepoch:
                tepoch.set_description(f"Epoch {epoch + 1}")

                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                running_corrects += torch.sum(preds == labels.data)

                tepoch.set_postfix(loss=loss.item())

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)

        # 검증 단계
        model.eval()
        val_loss = 0.0
        val_corrects = 0
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            with torch.no_grad():
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)
                val_loss += loss.item() * inputs.size(0)
                val_corrects += torch.sum(preds == labels.data)

        val_loss = val_loss / len(val_loader.dataset)
        val_acc = val_corrects.double() / len(val_loader.dataset)

        print(f'Epoch {epoch + 1}/{num_epochs}, '
              f'Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}, '
              f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}')

        # 모델 저장 조건 추가: 현재 검증 정확도가 지금까지의 최고보다 더 높으면 모델 저장
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), model_name)

    return model

In [18]:
model = train_and_validate_model(train_loader, val_loader, model, criterion, optimizer, num_epochs=num_epochs)

Epoch 1:   3%|==                                                                                                 | 7/234 [00:09<04:55,  1.30s/batch, loss=0.503]

KeyboardInterrupt



In [19]:
# clear GPU MEM
# for p in model.parameters():
#     if p.grad is not None:
#         del p.grad 
# torch.cuda.empty_cache()