In [None]:
import os
import pandas as pd
from PIL import Image
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from torchvision import transforms
import timm

from sklearn.model_selection import train_test_split


In [None]:
# 경로 설정
image_dir = './images'
label_dir = './labels'
output_image_dir = './cropped/images'
os.makedirs(output_image_dir, exist_ok=True)

# CSV 기록용 리스트
records = []

# 이미지 하나씩 처리
for filename in tqdm(os.listdir(label_dir)):
    if not filename.endswith('.txt'):
        continue

    image_id = filename[:-4]  # .txt 제거
    img_path = os.path.join(image_dir, image_id + '.png')
    label_path = os.path.join(label_dir, filename)

    if not os.path.exists(img_path):
        print(f"[경고] 이미지 없음: {img_path}")
        continue

    # 이미지 열기
    image = Image.open(img_path)
    w, h = image.size

    # 라벨 읽기
    with open(label_path, 'r') as f:
        lines = f.readlines()

    for idx, line in enumerate(lines):
        parts = line.strip().split()
        if len(parts) != 5:
            print(f"[무시] 잘못된 라인: {line}")
            continue

        class_id, cx, cy, bw, bh = map(float, parts)
        class_id = int(class_id)

        # 상대좌표 → 절대좌표
        xmin = int((cx - bw / 2) * w)
        ymin = int((cy - bh / 2) * h)
        xmax = int((cx + bw / 2) * w)
        ymax = int((cy + bh / 2) * h)

        # 이미지 자르기 (좌표 클램핑)
        xmin = max(0, xmin)
        ymin = max(0, ymin)
        xmax = min(w, xmax)
        ymax = min(h, ymax)

        crop = image.crop((xmin, ymin, xmax, ymax))

        # 저장
        cropped_filename = f"{image_id}_{idx}.png"
        cropped_path = os.path.join(output_image_dir, cropped_filename)
        crop.save(cropped_path)

        # CSV 기록
        records.append({'filename': cropped_filename, 'label': class_id})

# CSV 저장
df = pd.DataFrame(records)
df.to_csv('./cropped/labels.csv', index=False)
print("크롭 완료 및 CSV 저장 완료")


100%|██████████| 2065/2065 [03:53<00:00,  8.84it/s]

✅ 크롭 완료 및 CSV 저장 완료





In [None]:
class PillDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.data = dataframe.reset_index(drop=True)
        self.transform = transform
        self.base_path = './cropped/images'

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.base_path, self.data.iloc[idx, 0]).replace('\\', '/')
        label = int(self.data.iloc[idx, 1])

        if not os.path.exists(img_path):
            raise FileNotFoundError(f"이미지 파일 없음: {img_path}")

        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image, label


In [10]:
# 🔹 이미지넷 정규화 값 기준 transform 정의
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean, std)
])


In [11]:
# 🔹 라벨 CSV 로드 및 train/val 분할
df = pd.read_csv('./cropped/labels.csv')
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['label'], random_state=42)

train_dataset = PillDataset(train_df, transform=train_transform)
val_dataset = PillDataset(val_df, transform=val_transform)


train_loader = DataLoader(
    train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=0,            # 안정성 + 속도 절충안
    pin_memory=True,
)

val_loader = DataLoader(
    val_dataset,
    batch_size=64,
    shuffle=False,
    num_workers=0,
    pin_memory=True,
)

In [12]:
import time
start = time.time()
for i, (images, labels) in enumerate(train_loader):
    print(f"✅ 첫 배치 로딩 시간: {time.time() - start:.2f}초")
    break


✅ 첫 배치 로딩 시간: 0.53초


In [13]:
# 🔹 EfficientNetV2-S 모델 정의
NUM_CLASSES = len(df['label'].unique())
model = timm.create_model('efficientnetv2_rw_s', pretrained=True)
model.classifier = nn.Linear(model.classifier.in_features, NUM_CLASSES)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [14]:
# 🔹 손실함수, 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)


In [20]:
def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * images.size(0)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    if total == 0:
        return 0.0, 0.0
    return running_loss / total, correct / total


In [22]:
@torch.no_grad()
def validate(model, dataloader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        loss = criterion(outputs, labels)
        val_loss += loss.item() * images.size(0)

        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)

    if total == 0:
        return 0.0, 0.0
    return val_loss / total, correct / total


In [17]:
print("cuda" if torch.cuda.is_available() else "cpu")

cuda


In [23]:
EPOCHS = 20
best_val_acc = 0.0
patience = 7
epochs_no_improve = 0

for epoch in range(EPOCHS):
    train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)

    print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f} | Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")

    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), "best_model.pth")
        print("✅ Best model saved.")
        epochs_no_improve = 0  # 개선됐으므로 초기화
    else:
        epochs_no_improve += 1
        print(f"⚠️ No improvement for {epochs_no_improve} epoch(s).")

    if epochs_no_improve >= patience:
        print(f"🛑 Early stopping triggered after {epoch+1} epochs.")
        break


Epoch 1/20 | Train Loss: 0.1950, Acc: 0.9792 | Val Loss: 0.0805, Acc: 0.9975
✅ Best model saved.
Epoch 2/20 | Train Loss: 0.0411, Acc: 0.9980 | Val Loss: 0.0269, Acc: 0.9994
✅ Best model saved.
Epoch 3/20 | Train Loss: 0.0176, Acc: 0.9985 | Val Loss: 0.0218, Acc: 0.9988
⚠️ No improvement for 1 epoch(s).
Epoch 4/20 | Train Loss: 0.0114, Acc: 0.9995 | Val Loss: 0.0149, Acc: 0.9994
⚠️ No improvement for 2 epoch(s).
Epoch 5/20 | Train Loss: 0.0073, Acc: 0.9995 | Val Loss: 0.0135, Acc: 0.9988
⚠️ No improvement for 3 epoch(s).
Epoch 6/20 | Train Loss: 0.0076, Acc: 0.9989 | Val Loss: 0.0117, Acc: 0.9994
⚠️ No improvement for 4 epoch(s).
Epoch 7/20 | Train Loss: 0.0045, Acc: 0.9995 | Val Loss: 0.0099, Acc: 0.9994
⚠️ No improvement for 5 epoch(s).
Epoch 8/20 | Train Loss: 0.0033, Acc: 0.9997 | Val Loss: 0.0101, Acc: 0.9988
⚠️ No improvement for 6 epoch(s).
Epoch 9/20 | Train Loss: 0.0024, Acc: 0.9998 | Val Loss: 0.0087, Acc: 0.9994
⚠️ No improvement for 7 epoch(s).
🛑 Early stopping triggered af