In [None]:
from google.colab import drive
import os
import sys

# 구글 드라이브 마운트
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
# from categorize_img import categorize_imgs

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 데이터 전처리
# 성별 & 스타일을 분류하는 작업이므로, 데이터셋에 적합한 전처리 과정을 거칩니다.
# 간단한 object detection과 같은 전처리는 제외하고, 기본적인 이미지 크롭과 augmentation을 사용.

transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet의 입력 크기에 맞춤
    transforms.RandomHorizontalFlip(),  # 데이터 증강
    transforms.RandomCrop(224, padding=4),  # 랜덤 크롭
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet 통계치 사용
])

In [None]:
import shutil

def categorize_imgs(src, dest_dir):
    # Google Drive 내 src_folder 경로 지정
    src_folder = src

    # dest_folder 지정(폴더가 없다면 생성)
    dest_folder = os.path.join(dest_dir, os.path.basename(src))
    if not os.path.exists(dest_folder):
        os.makedirs(dest_folder)

    # 오류 발생을 대비하여 처리된 이미지 수 추적
    processed_count = 0
    error_count = 0

    # src_folder의 모든 파일을 가져옴
    for filename in os.listdir(src_folder):
        if filename.endswith('.jpg'):
            try:
                # 파일명에서 style 정보 추출
                style = filename.split('_')[-2]

                # style에 해당하는 폴더가 없다면 생성
                style_folder = os.path.join(dest_folder, style)
                if not os.path.exists(style_folder):
                    os.makedirs(style_folder)

                # 원본 이미지 복사(copy2: metadata까지 복사)
                src_path = os.path.join(src_folder, filename)
                dest_path = os.path.join(style_folder, filename)
                shutil.copy2(src_path, dest_path)

                # 정상적으로 처리하였을 경우 count 증가
                processed_count += 1

            except Exception as e:
                # 에러가 발생하여 변환에 실패하면 에러 메시지 출력
                error_count += 1
                print(f"Error processing {filename}: {str(e)}")

    print(f"Images are categorized successfully! ==> Total images processed: {processed_count} / Total images failed: {error_count}")


if __name__ == "__main__":
    # Google Drive에 저장될 폴더 경로 지정
    dest_dir = '/content/drive/MyDrive/categorized_data'

    # 새로 만들어질 폴더가 없다면 생성
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)

    # source data가 있는 Google Drive 내 폴더 지정
    train_path = '/content/drive/MyDrive/dataset/training_image'
    val_path = '/content/drive/MyDrive/dataset/validation_image'

    # style 별로 이미지 분류
    categorize_imgs(train_path, dest_dir)
    categorize_imgs(val_path, dest_dir)

Images are categorized successfully! ==> Total images processed: 1799 / Total images failed: 0
Images are categorized successfully! ==> Total images processed: 951 / Total images failed: 0


In [None]:
# 데이터 로드
train_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/categorized_data/training_image', transform=transform)
val_dataset = datasets.ImageFolder(root='/content/drive/MyDrive/categorized_data/validation_image', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
# 데이터셋의 클래스 수 확인
print(train_dataset.class_to_idx)
print(val_dataset.class_to_idx)

{'athleisure': 0, 'bodyconscious': 1, 'bold': 2, 'cityglam': 3, 'classic': 4, 'disco': 5, 'ecology': 6, 'feminine': 7, 'genderless': 8, 'grunge': 9, 'hiphop': 10, 'hippie': 11, 'ivy': 12, 'kitsch': 13, 'lingerie': 14, 'lounge': 15, 'metrosexual': 16, 'military': 17, 'minimal': 18, 'mods': 19, 'normcore': 20, 'oriental': 21, 'popart': 22, 'powersuit': 23, 'punk': 24, 'space': 25, 'sportivecasual': 26}
{'athleisure': 0, 'bodyconscious': 1, 'bold': 2, 'cityglam': 3, 'classic': 4, 'disco': 5, 'ecology': 6, 'feminine': 7, 'genderless': 8, 'grunge': 9, 'hiphop': 10, 'hippie': 11, 'ivy': 12, 'kitsch': 13, 'lingerie': 14, 'lounge': 15, 'metrosexual': 16, 'military': 17, 'minimal': 18, 'mods': 19, 'normcore': 20, 'oriental': 21, 'popart': 22, 'powersuit': 23, 'punk': 24, 'space': 25, 'sportivecasual': 26}


In [None]:
# 모델 초기화 (ResNet-18)
# 사전 학습된 가중치 없이 랜덤으로 초기화
# resnet-18 모델을 전이학습 없이 직접 구현
import torch
import torch.nn as nn

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = self.relu(out)
        return out

class ResNet18(nn.Module):
    def __init__(self, num_classes=27):
        super(ResNet18, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(BasicBlock, 64, 2)
        self.layer2 = self._make_layer(BasicBlock, 128, 2, stride=2)
        self.layer3 = self._make_layer(BasicBlock, 256, 2, stride=2)
        self.layer4 = self._make_layer(BasicBlock, 512, 2, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * BasicBlock.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

# 모델 생성
model = ResNet18()
model = model.to(device)



In [None]:
# 손실 함수와 옵티마이저 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
import torch

# 학습 함수 정의 (학습과 검증 분리)
def train_model(model, criterion, optimizer, train_loader, num_epochs=10, save_path='./model_epoch'):
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_corrects = 0
        total_train_samples = 0

        print(f'\nEpoch {epoch+1}/{num_epochs}')
        print('-' * 20)

        # 학습 단계 (배치마다 결과 출력)
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total_train_samples += inputs.size(0)

            # 배치마다 결과 출력
            batch_loss = loss.item()
            batch_acc = torch.sum(preds == labels.data).double() / inputs.size(0)
            print(f'Batch {batch_idx+1}/{len(train_loader)}: Train Loss: {batch_loss:.4f}, Train Acc: {batch_acc:.4f}')

        # 에포크 결과 계산
        epoch_loss = running_loss / total_train_samples
        epoch_acc = running_corrects.double() / total_train_samples
        print(f'Epoch {epoch+1} Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f}')

        # 에포크마다 모델의 state_dict 저장
        torch.save(model.state_dict(), f'model_epoch_{epoch+1}.pth')

def validate_model(model, criterion, val_loader, load_path):
    # 저장된 state_dict 로드
    model.load_state_dict(torch.load(load_path))
    model.eval()

    val_running_loss = 0.0
    val_corrects = 0
    val_total = 0

    with torch.no_grad():
        for batch_idx, (val_inputs, val_labels) in enumerate(val_loader):
            val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)
            val_outputs = model(val_inputs)
            val_loss = criterion(val_outputs, val_labels)
            val_running_loss += val_loss.item() * val_inputs.size(0)

            _, val_preds = torch.max(val_outputs, 1)
            val_corrects += torch.sum(val_preds == val_labels.data)
            val_total += val_labels.size(0)

            # 배치마다 결과 출력
            val_batch_loss = val_loss.item()
            val_batch_acc = torch.sum(val_preds == val_labels.data).double() / val_inputs.size(0)
            print(f'Batch {batch_idx+1}/{len(val_loader)}: Val Loss: {val_batch_loss:.4f}, Val Acc: {val_batch_acc:.4f}')

    # 에포크 결과 계산
    val_epoch_loss = val_running_loss / val_total
    val_epoch_acc = val_corrects.double() / val_total
    print(f'Validation Loss: {val_epoch_loss:.4f}, Validation Acc: {val_epoch_acc:.4f}')

In [None]:
# 모델 상태를 저장할 파일 경로
save_path = '/content/drive/MyDrive/model'

In [None]:
# 학습 실행
train_model(model, criterion, optimizer, train_loader, num_epochs=10, save_path=save_path)

In [None]:
# 학습 완료 후 검증 단계 호출
validate_model(model, criterion, val_loader, load_path='/content/drive/MyDrive/model/model_epoch_10.pth')