In [None]:
import os
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
from torch.utils.data import random_split
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision import models
from torchvision import transforms
from torchvision.models import ResNet152_Weights

In [11]:
def resize(img, target_size=256):
    w, h = img.size
    ratio = target_size / h if w > h else target_size / w
    return img.resize((int(w*ratio), int(h*ratio)))


hyperparams = {
    "train_val_ratio" : 0.9,
    "batch_size" : 80,
    "learning_rate" : 0.0001,
    "epochs" : 15,
    "transform" : transforms.Compose(
        [
            transforms.Lambda(resize),
            # transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.48235, 0.45882, 0.40784],
                std=[1.0/255.0, 1.0/255.0, 1.0/255.0]
            )
        ]
    )
}

dataset = ImageFolder("./data/train", transform=hyperparams["transform"])

# Train, Val 데이터 분리
train_size = int(len(dataset) * hyperparams["train_val_ratio"])
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Train, Val 데이터 로더 정의
train_dataloader = DataLoader(
    train_dataset, batch_size=hyperparams["batch_size"], shuffle=True, drop_last=True
)
val_dataloader = DataLoader(
    val_dataset, batch_size=hyperparams["batch_size"], shuffle=True, drop_last=True
)

model = models.resnet152(weights=ResNet152_Weights.IMAGENET1K_V1)
model.fc = nn.Linear(model.fc.in_features, len(dataset.classes))

model_path = "./models/rock_latest.pth"

if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path))
    print("model param load")

model param load


In [12]:
from torch.amp import autocast, GradScaler
device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

model = model.to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.SGD(model.parameters(), lr=hyperparams["learning_rate"])
scaler = GradScaler() 

for epoch in range(hyperparams["epochs"]):
    cost = 0.0
    model.train()

    for img, cls in train_dataloader:
        img = img.to(device)
        cls = cls.to(device)

        optimizer.zero_grad()

        with autocast(device_type="cuda"): 
            output = model(img)
            loss = criterion(output, cls)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        cost += loss.item()

    cost = cost / len(train_dataloader)

    # 모델 검증
    with torch.no_grad():
        model.eval()
        acc = 0.0

        for img, cls in val_dataloader:
            img = img.to(device)
            cls = cls.to(device)

            with autocast(device_type="cuda"):
                outputs = model(img)
                probs = F.softmax(outputs, dim=-1)
                outputs_classes = torch.argmax(probs, dim=-1)

            acc += int(torch.eq(cls, outputs_classes).sum())

        total = len(val_dataloader) * hyperparams['batch_size']
        print(f"Epoch : {epoch+1:4d}, Cost : {cost:.3f}, acc@1 : {acc / total * 100:.2f}")

    # 모델 저장
    if os.path.exists(model_path):
        os.remove(model_path)
    torch.save(model.state_dict(), model_path)


cuda
Epoch :    1, Cost : 0.192, acc@1 : 95.98
Epoch :    2, Cost : 0.185, acc@1 : 95.85


KeyboardInterrupt: 

In [13]:
import os
import torch
from torchvision import models, transforms
from PIL import Image
import pandas as pd

# 모델 로드
model = models.resnet152()
model.fc = torch.nn.Linear(model.fc.in_features, len(dataset.classes))
model.load_state_dict(torch.load(model_path, map_location="cuda"))
model.eval().cuda()

# 전처리 정의
transform = transforms.Compose([
    transforms.Lambda(resize),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.48235, 0.45882, 0.40784],
        std=[1.0 / 255.0, 1.0 / 255.0, 1.0 / 255.0]
    )
])

# 테스트 이미지 로드
test_dir = "./data/test"
image_files = sorted([f for f in os.listdir(test_dir) if f.endswith(".jpg")])

# 배치 추론 설정
batch_size = 64
results = []
batch_images = []
batch_names = []

for i, fname in enumerate(image_files):
    path = os.path.join(test_dir, fname)
    img = Image.open(path).convert("RGB")
    img_tensor = transform(img)
    batch_images.append(img_tensor)
    batch_names.append(fname)

    # 배치 단위로 추론하거나 마지막 남은 이미지들 처리
    if len(batch_images) == batch_size or i == len(image_files) - 1:
        input_tensor = torch.stack(batch_images).cuda()
        with torch.no_grad():
            outputs = model(input_tensor)
            preds = torch.argmax(outputs, dim=1).tolist()

        for name, pred in zip(batch_names, preds):
            image_id = os.path.splitext(name)[0]
            pred_label = dataset.classes[pred]
            results.append((image_id, pred_label))

        # 배치 초기화
        batch_images.clear()
        batch_names.clear()

# 결과 저장
df = pd.DataFrame(results, columns=["ID", "rock_type"])
sub_path = "./data/rock_submission.csv"
if os.path.exists(sub_path):
    os.remove(sub_path)
df.to_csv(sub_path, index=False)
