In [2]:
import os
import shutil
import random
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
from sklearn.metrics import classification_report, confusion_matrix

In [3]:
# 데이터 경로
base_dir = "/home/ec2-user/SageMaker/data/Binary"

# 데이터 전처리
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

# DataLoader 설정
dataloaders = {
    x: DataLoader(
        datasets.ImageFolder(root=os.path.join(base_dir, x), transform=data_transforms[x]),
        batch_size=32, shuffle=(x == 'train'), num_workers=4
    )
    for x in ['train', 'val', 'test']
}

# 데이터 크기 및 클래스 이름 확인
dataset_sizes = {x: len(dataloaders[x].dataset) for x in ['train', 'val', 'test']}
class_names = dataloaders['train'].dataset.classes

print(f"Classes: {class_names}")
print(f"Dataset sizes: {dataset_sizes}")


Classes: ['NG', 'OK']
Dataset sizes: {'train': 3977, 'val': 1136, 'test': 570}


In [None]:
# EfficientNet

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models

# 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Pre-trained EfficientNet 모델 불러오기
model = models.efficientnet_b0(pretrained=True)  # EfficientNet B0 모델 사용

# Output Layer 수정 (OK/NG 이진 분류)
num_features = model.classifier[1].in_features
model.classifier = nn.Sequential(
    nn.Linear(num_features, 1),
    nn.Sigmoid()  # 이진 분류
)
model = model.to(device)

# 손실 함수와 옵티마이저 설정
criterion = nn.BCELoss()  # Binary CrossEntropy Loss
optimizer = optim.Adam(model.parameters(), lr=0.001)


Downloading: "https://download.pytorch.org/models/efficientnet_b0_rwightman-7f5810bc.pth" to /home/ec2-user/.cache/torch/hub/checkpoints/efficientnet_b0_rwightman-7f5810bc.pth
100%|██████████| 20.5M/20.5M [00:00<00:00, 225MB/s]


In [10]:
def train_model(model, criterion, optimizer, dataloaders, dataset_sizes, num_epochs=25):
    best_model_wts = model.state_dict()
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f"Epoch {epoch + 1}/{num_epochs}")
        print("-" * 20)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device).float()  # Binary CrossEntropy에서는 float 타입 사용

                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs).squeeze(1)  # EfficientNet의 출력 크기 조정
                    preds = (outputs > 0.5).float()  # Threshold 0.5 적용
                    loss = criterion(outputs, labels)  # Loss 계산

                    if phase == 'train':
                        # Backward pass 및 최적화
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                # Loss 및 정확도 계산
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")

            # Best 모델 저장
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()

    # Best 모델 가중치 로드
    model.load_state_dict(best_model_wts)
    print(f"Best Validation Accuracy: {best_acc:.4f}")
    return model


In [11]:
def evaluate_model(model, dataloader):
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).float()  # BCELoss에 맞게 float으로 변환

            # Forward pass
            outputs = model(inputs).squeeze(1)  # EfficientNet 출력 크기 조정
            preds = (outputs > 0.5).float()  # 0.5를 기준으로 이진 분류

            # 결과 저장
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    # Confusion Matrix 출력
    print("Confusion Matrix:")
    print(confusion_matrix(y_true, y_pred))

    # Classification Report 출력
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))



In [None]:
# 모델 학습
model = train_model(model, criterion, optimizer, dataloaders, dataset_sizes, num_epochs=25)

# 테스트 데이터 평가
print("\nTesting the model...")
evaluate_model(model, dataloaders['test'])



Epoch 1/25
--------------------
train Loss: 0.1035 Acc: 0.9688
val Loss: 0.0965 Acc: 0.9569
Epoch 2/25
--------------------
train Loss: 0.0526 Acc: 0.9824
val Loss: 0.0614 Acc: 0.9701
Epoch 3/25
--------------------
train Loss: 0.0328 Acc: 0.9882
val Loss: 0.0465 Acc: 0.9903
Epoch 4/25
--------------------
train Loss: 0.0288 Acc: 0.9897
val Loss: 0.1004 Acc: 0.9762
Epoch 5/25
--------------------
train Loss: 0.0161 Acc: 0.9952
val Loss: 0.0619 Acc: 0.9868
Epoch 6/25
--------------------
train Loss: 0.0168 Acc: 0.9950
val Loss: 0.0314 Acc: 0.9903
Epoch 7/25
--------------------
train Loss: 0.0151 Acc: 0.9947
val Loss: 0.0334 Acc: 0.9921
Epoch 8/25
--------------------


In [None]:
# 테스트 데이터 평가

In [None]:
import time
from sklearn.metrics import classification_report, confusion_matrix, f1_score

def evaluate_model(model, dataloader):
    model.eval()
    y_true = []
    y_pred = []

    # 추론 시간 측정 시작
    start_time = time.time()

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device).float()

            outputs = model(inputs)
            preds = (outputs > 0.5).float()

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    # 추론 시간 측정 종료
    end_time = time.time()
    inference_time = end_time - start_time

    # Confusion Matrix
    print("Confusion Matrix:")
    print(confusion_matrix(y_true, y_pred))

    # Classification Report
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred, target_names=class_names))

    # F1-score 점수 출력
    f1 = f1_score(y_true, y_pred, average="weighted")
    print(f"\nWeighted F1-score: {f1:.4f}")

    # 추론 시간 출력
    print(f"\nInference Time (Total): {inference_time:.4f} seconds")
    print(f"Average Inference Time per Sample: {inference_time / len(y_true):.4f} seconds")

# 테스트 데이터 평가
evaluate_model(model, dataloaders['test'])
