In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms, models
from torchvision.transforms import autoaugment
from torch.utils.data import DataLoader, random_split
from torch.optim.lr_scheduler import CosineAnnealingLR, ReduceLROnPlateau
import numpy as np
import os
from PIL import Image
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

torch.manual_seed(777)
if device == 'cuda':
    torch.cuda.manual_seed_all(777)

In [None]:
!pip install albumentations

In [None]:
from torch.utils.data import Dataset, DataLoader, ConcatDataset, random_split
from PIL import Image
import os
import torchvision.transforms as transforms
import albumentations as A
import albumentations.pytorch


class CustomImageDataset(Dataset):
    def __init__(self, img_dir, transform=None, label=0):
        self.img_dir = img_dir
        self.transform = transform
        self.label = label
        self.img_paths = [
            os.path.join(img_dir, img_file)
            for img_file in os.listdir(img_dir)
            if img_file.lower().endswith(("png", "jpg", "jpeg"))
        ]

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image=np.array(image))['image']
        return image, self.label


# 이미지 경로
root_dir_train = (
    "/content/drive/MyDrive/CDAL/summer2winter_yosemite/train"
)
root_dir_test = (
    "/content/drive/MyDrive/CDAL/summer2winter_yosemite/test"
)


# 데이터 증강 및 전처리
transform_train = A.Compose([
    A.Resize(256, 256),
    A.HorizontalFlip(p=0.85),
    A.RandomRotate90(p=0.85),
    A.VerticalFlip(p=0.85),
    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    A.RandomCrop(width=224, height=224),  # 이미지 크기에 맞게 조정 필요
    A.OneOf([
        A.MotionBlur(p=0.85),
        A.OpticalDistortion(p=0.85),
        A.GaussNoise(p=0.85)
    ], p=0.85),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    A.pytorch.transforms.ToTensorV2(),
])


transform_test = A.Compose([
    A.Resize(256, 256),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    A.pytorch.transforms.ToTensorV2(),
])

In [None]:
# 데이터셋 생성
train_dataset_summer = CustomImageDataset(
    os.path.join(root_dir_train, "train_summer"), transform=transform_train, label=0
)
train_dataset_winter = CustomImageDataset(
    os.path.join(root_dir_train, "train_winter"), transform=transform_train, label=1
)

test_dataset_summer = CustomImageDataset(
    os.path.join(root_dir_test, "test_summer"), transform=transform_test, label=0
)
test_dataset_winter = CustomImageDataset(
    os.path.join(root_dir_test, "test_winter"), transform=transform_test, label=1
)

# 데이터셋 합치기
train_dataset = ConcatDataset([train_dataset_summer, train_dataset_winter])
test_dataset = ConcatDataset([test_dataset_summer, test_dataset_winter])

# 검증 데이터셋 분할
val_split = 0.2
train_size = int((1 - val_split) * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset_split, val_dataset_split = random_split(
    train_dataset, [train_size, val_size]
)

# DataLoader 생성
train_loader = DataLoader(train_dataset_split, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset_split, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# ResNet50 모델 로드 및 수정
model = models.resnet18(pretrained=True).to(device)

# 모든 파라미터를 먼저 고정하고 특정 레이어의 파라미터를 학습 가능하게 설정
for param in model.parameters():
    param.requires_grad = False

for param in model.layer4.parameters():
    param.requires_grad = True

num_ftrs = model.fc.in_features
model.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(num_ftrs, 2)
).to(device)

#  옵티마이저와 스케줄러 설정
optimizer = optim.Adam([
    {'params': model.layer4.parameters(), 'lr': 1e-4},
    {'params': model.fc.parameters(), 'lr': 1e-3}
], lr=1e-3)

# 스케줄러 설정
scheduler = CosineAnnealingLR(optimizer, T_max=len(train_loader), eta_min=0, last_epoch=-1)


# 손실 함수 설정
criterion = nn.CrossEntropyLoss()

In [None]:
# 필요한 경우 TensorBoard를 설치합니다.
!pip install tensorboard

In [None]:
from torch.utils.tensorboard import SummaryWriter

# TensorBoard를 위한 SummaryWriter 초기화
writer = SummaryWriter('runs/summer2winter_yosemite_experiment')

In [None]:
dataiter = iter(train_loader)
images, labels = next(dataiter)
img_grid = torchvision.utils.make_grid(images)

writer.add_image('train', img_grid)

writer.add_graph(model, images.cuda())


In [None]:
# 학습 함수
def train(model, train_loader, device, criterion, optimizer, epoch, writer):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(output.data, 1)
        total += target.size(0)
        correct += (predicted == target).sum().item()

        train_accuracy = 100. * correct / total

        global_step = (epoch - 1) * len(train_loader) + batch_idx
        writer.add_scalar('Loss/train', loss.item(), global_step)
        writer.add_scalar('Accuracy/train', train_accuracy, global_step)

    train_loss = running_loss / len(train_loader)
    print(f'Train Epoch: {epoch}, Loss: {train_loss:.6f}, Acc: {train_accuracy:.2f}%')

In [None]:
# 검증 함수
def validate(model, val_loader, device, criterion, epoch, writer):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(val_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            val_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

            val_accuracy = 100. * correct / total

            global_step = (epoch - 1) * len(val_loader) + batch_idx
            writer.add_scalar('Loss/val', loss.item(), global_step)
            writer.add_scalar('Accuracy/val', val_accuracy, global_step)

    val_loss /= len(val_loader)

    print(f'Validation Epoch: {epoch}, Loss: {val_loss:.6f}, Acc: {val_accuracy:.2f}%')

In [None]:
# 테스트 함수
def test(model, device, test_loader, epoch, writer):
    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    test_loss /= len(test_loader)
    test_accuracy = 100. * correct / total
    print(f'\nTest set: Accuracy: {test_accuracy:.2f}%\n')



In [None]:
# 주어진 설정으로 모델 훈련
num_epochs = 50
for epoch in range(1, num_epochs + 1):
    train(model, train_loader, device, criterion, optimizer, epoch, writer)
    validate(model, val_loader, device, criterion, epoch, writer)
    # scheduler.step()


test(model, device, test_loader, epoch, writer)

%load_ext tensorboard
%tensorboard --logdir=runs

# 학습이 끝나면 SummaryWriter를 닫음
writer.close()
