In [1]:
# 코랩에서 구글 드라이브 접근
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torchvision.models.resnet import BasicBlock
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from torchvision import models, datasets
from PIL import Image
import pandas as pd
import os

In [3]:
class SteelDataset(Dataset):
    def __init__(self, csv_path, img_dir, transform=None):
        self.data = pd.read_csv(csv_path)
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = os.path.join(self.img_dir, str(self.data.iloc[idx, 1]), self.data.iloc[idx, 0])
        image = Image.open(img_name)
        # 이미지를 6등분하여 각각 (224, 224)로 리사이즈
        img_parts = []
        width, height = image.size
        crop_size = (width // 3, height // 2)
        for i in range(2):
            for j in range(3):
                left = j * crop_size[0]
                upper = i * crop_size[1]
                right = left + crop_size[0]
                lower = upper + crop_size[1]
                img_part = image.crop((left, upper, right, lower)).resize((224, 224))
                img_parts.append(img_part)

        label = int(self.data.iloc[idx, 1]) - 1  # 라벨을 0부터 시작하도록 변환
        if self.transform:
            img_parts = [self.transform(part) for part in img_parts]
        return img_parts, label

In [4]:
# 하이퍼파라미터 설정
batch_size = 32
learning_rate = 0.001
num_classes = 4

# 데이터 로드 및 전처리
transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_dataset = SteelDataset(csv_path='/content/drive/MyDrive/졸업과제/severstal-steel-defect-detection/aug-train.csv',
                              img_dir='/content/drive/MyDrive/졸업과제/working/aug_train/',
                              transform=transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

In [5]:
class ResNetSteelClassifier(nn.Module):
    def __init__(self, num_classes):
        super(ResNetSteelClassifier, self).__init__()
        self.resnet = models.resnet18(pretrained=True)
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(num_features, num_classes)

    def forward(self, x):
        return self.resnet(x)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ResNetSteelClassifier(num_classes=4).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:01<00:00, 24.9MB/s]


In [None]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    with tqdm(train_loader, unit="batch") as t:
        for img_parts, labels in t:
            optimizer.zero_grad()
            outputs = []

            for part in img_parts:
                # images, labels = images.to(device), labels.to(device)
                part = part.to(device)
                part_outputs = model(part)
                part_outputs = part_outputs.to(device)
                outputs.append(part_outputs)

            outputs = torch.stack(outputs, dim=1).mean(dim=1)  # 이미지 부분의 출력 평균
            labels = labels.to(device)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            t.set_postfix(loss=running_loss / (t.n + 1))

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
# 학습된 모델 저장
torch.save(model.state_dict(), '/content/drive/MyDrive/졸업과제/모델/resnet_model_V1_epoch_10.pth')

In [7]:
# 모델 불러오기
loaded_model = ResNetSteelClassifier(num_classes=4).to(device)
loaded_model.load_state_dict(torch.load('/content/drive/MyDrive/졸업과제/모델/resnet_model_V1_epoch_10.pth'))
loaded_model.eval()

ResNetSteelClassifier(
  (resnet): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=Tr

In [8]:
# 모델 평가
preds = []
targets = []

with torch.no_grad(), tqdm(train_loader, unit="batch") as t:
    for img_parts, labels in t:

        optimizer.zero_grad()
        outputs = []

        for part in img_parts:
            # images, labels = images.to(device), labels.to(device)
            part = part.to(device)
            part_outputs = loaded_model(part)
            part_outputs = part_outputs.to(device)
            outputs.append(part_outputs)

        outputs = torch.stack(outputs, dim=1).mean(dim=1)
        _, predicted = torch.max(outputs, 1)
        targets.extend(labels.cpu().numpy())
        preds.extend(predicted.cpu().numpy())
        t.set_postfix(accuracy=accuracy_score(targets, preds))

# 평가 지표 계산
accuracy = accuracy_score(targets, preds)
precision = precision_score(targets, preds, average='weighted')
recall = recall_score(targets, preds, average='weighted')
f1 = f1_score(targets, preds, average='weighted')

print()
print("Train")
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1:.4f}")

100%|██████████| 125/125 [29:16<00:00, 14.05s/batch, accuracy=0.779]


Train
Accuracy: 0.7790
Precision: 0.7843
Recall: 0.7790
F1 Score: 0.7755





In [9]:
# 테스트 데이터 로드 및 전처리
test_dataset = SteelDataset(csv_path='/content/drive/MyDrive/졸업과제/severstal-steel-defect-detection/aug-test.csv',
                              img_dir='/content/drive/MyDrive/졸업과제/working/aug-test/',
                              transform=transform)
test_loader = DataLoader(dataset=test_dataset, batch_size=32, shuffle=True)

# 모델 평가 및 평가 지표 계산 (테스트 데이터)
test_preds = []
test_targets = []

with torch.no_grad(), tqdm(test_loader, unit="batch") as t:
    for img_parts, labels in t:

        optimizer.zero_grad()
        outputs = []

        for part in img_parts:
            # images, labels = images.to(device), labels.to(device)
            part = part.to(device)
            part_outputs = loaded_model(part)
            part_outputs = part_outputs.to(device)
            outputs.append(part_outputs)

        outputs = torch.stack(outputs, dim=1).mean(dim=1)
        _, predicted = torch.max(outputs, 1)
        test_targets.extend(labels.cpu().numpy())
        test_preds.extend(predicted.cpu().numpy())
        t.set_postfix(test_accuracy=accuracy_score(test_targets, test_preds))

# 평가 지표 계산 (테스트 데이터)
test_accuracy = accuracy_score(test_targets, test_preds)
test_precision = precision_score(test_targets, test_preds, average='weighted')
test_recall = recall_score(test_targets, test_preds, average='weighted')
test_f1 = f1_score(test_targets, test_preds, average='weighted')

print()
print("Test")
print(f"Accuracy: {test_accuracy:.4f}")
print(f"Precision: {test_precision:.4f}")
print(f"Recall: {test_recall:.4f}")
print(f"F1 Score: {test_f1:.4f}")

100%|██████████| 32/32 [13:56<00:00, 26.14s/batch, test_accuracy=0.744]


Test
Accuracy: 0.7440
Precision: 0.7499
Recall: 0.7440
F1 Score: 0.7389





### 직접 구현된 코드

In [None]:
# ResNet 모델 정의
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=num_classes):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, blocks, stride=1):
        layers = []
        layers.append(block(self.in_planes, planes, stride))
        self.in_planes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_planes, planes))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# ResNet 모델 인스턴스 생성 및 GPU로 이동
model = ResNet(BasicBlock, [2, 2, 2, 2]).cuda()

# 손실 함수와 옵티마이저 설정
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# 모델 학습
for epoch in range(num_epochs):
    for images, labels in train_loader:
        images = images.cuda()
        labels = labels.cuda()

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

print("Training finished!")