# **Self-Supervised Learning**

## **데이터 다운로드**

In [31]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
from torch.utils.data import DataLoader, Subset, Dataset
import numpy as np
import random
import matplotlib.pyplot as plt

# 재현성 설정
random.seed(2024)
np.random.seed(2024)
torch.manual_seed(2024)
torch.cuda.manual_seed(2024)
torch.cuda.manual_seed_all(2024)

# 하이퍼파라미터 설정
BATCH_SIZE = 128
EPOCHS = 50
SSL_EPOCHS = 100
LEARNING_RATE = 0.1
SSL_LEARNING_RATE = 0.001
TAU = 0.07
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [40]:
# SimCLR용 데이터 변환 (두 개의 Augmentation 생성)
class SimCLRTransform:
    def __init__(self):
        self.transform = transforms.Compose([
            transforms.RandomResizedCrop(32),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
        ])

    def __call__(self, x):
        return self.transform(x), self.transform(x)  # 두 개의 서로 다른 변환된 이미지 반환

# CIFAR-100 데이터 로드 (Supervised Learning용)
supervised_transform = transforms.Compose([
    transforms.RandomResizedCrop(32),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5071, 0.4867, 0.4408], std=[0.2675, 0.2565, 0.2761])
])

dataset = datasets.CIFAR100(root="./data", train=True, download=True, transform=supervised_transform)
test_dataset = datasets.CIFAR100(root="./data", train=False, download=True, transform=supervised_transform)

# Test DataLoader 생성
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

# 레이블 10% 데이터 추출
num_classes = 100
labeled_indices = []
unlabeled_indices = []

for class_idx in range(num_classes):
    class_indices = np.where(np.array(dataset.targets) == class_idx)[0]
    np.random.shuffle(class_indices)
    labeled_indices.extend(class_indices[:60])   # 각 클래스당 60개 (10%)
    unlabeled_indices.extend(class_indices[60:]) # 나머지 90%

# Labeled & Unlabeled Dataset 생성
labeled_dataset = Subset(dataset, labeled_indices)

# Unlabeled 데이터에 SimCLR Transform 적용
unlabeled_dataset = datasets.CIFAR100(root="./data", train=True, download=False, transform=SimCLRTransform())

# DataLoader 생성
labeled_loader = DataLoader(labeled_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
unlabeled_loader = DataLoader(unlabeled_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


## **모델 정의**

In [33]:
# ResNet 모델 정의
class ResNetBaseline(nn.Module):
    def __init__(self):
        super(ResNetBaseline, self).__init__()
        self.model = models.resnet18(num_classes=100)

    def forward(self, x):
        return self.model(x)

# Supervised Learning 학습 (Baseline)
def train_supervised():
    model = ResNetBaseline().to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=LEARNING_RATE, momentum=0.9, weight_decay=5e-4)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)

    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0
        correct = 0
        total = 0
        for images, labels in labeled_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)

        scheduler.step()
        print(f"Epoch [{epoch+1}/{EPOCHS}], Loss: {total_loss/len(labeled_loader):.4f}, Accuracy: {100*correct/total:.2f}%")

    return model

In [42]:
# Self-Supervised Learning (SimCLR 방식)
class SimCLR(nn.Module):
    def __init__(self, base_model, out_dim=128):
        super(SimCLR, self).__init__()
        self.encoder = base_model
        self.encoder.fc = nn.Identity()
        self.projector = nn.Sequential(
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, out_dim)
        )

    def forward(self, x):
        features = self.encoder(x)
        return self.projector(features)

# SimCLR 학습 (pretrain)
def pretrain_simclr(model, dataloader, epochs=100, lr=0.001, tau=0.07, device=DEVICE):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    model.to(device)
    model.train()

    for epoch in range(epochs):
        epoch_loss = 0

        for (images1, images2), _ in dataloader:
            images1, images2 = images1.to(device), images2.to(device) # 동일한 이미지에서 서로 다른 augmentation을 거친 image
            optimizer.zero_grad()

            z1 = model(images1)
            z2 = model(images2)

            z1 = F.normalize(z1, dim=1)
            z2 = F.normalize(z2, dim=1)

            features = torch.cat([z1, z2], dim=0)  # (2N, D)

            # 유사도 계산
            cos_sim = F.cosine_similarity(features[:, None, :], features[None, :, :], dim=-1)

            # self-mask를 사용하여 같은 이미지 제거
            self_mask = torch.eye(cos_sim.shape[0], dtype=torch.bool, device=device)
            cos_sim.masked_fill_(self_mask, -9e15)

            # positive sample 위치를 결정
            pos_mask = self_mask.roll(shifts=cos_sim.shape[0] // 2, dims=0)

            # InfoNCE Loss
            # Positive Pair는 같은 이미지의 서로 다른 augmentation 결과이고, Negative Pair는 배치 내의 다른 이미지
            cos_sim = cos_sim / tau
            nll = -cos_sim[pos_mask] + torch.logsumexp(cos_sim, dim=-1)
            loss = nll.mean()

            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {epoch_loss / len(dataloader):.4f}")

    return model

## **모델 학습 및 평가**

In [35]:
print("Training Supervised Model...")
baseline_model = train_supervised()

Training Supervised Model...
Epoch [1/50], Loss: 5.0282, Accuracy: 2.50%
Epoch [2/50], Loss: 4.4400, Accuracy: 5.10%
Epoch [3/50], Loss: 4.1659, Accuracy: 6.27%
Epoch [4/50], Loss: 4.0249, Accuracy: 8.40%
Epoch [5/50], Loss: 3.9334, Accuracy: 9.23%
Epoch [6/50], Loss: 3.8500, Accuracy: 10.63%
Epoch [7/50], Loss: 3.8266, Accuracy: 11.15%
Epoch [8/50], Loss: 3.7648, Accuracy: 11.53%
Epoch [9/50], Loss: 3.7278, Accuracy: 12.25%
Epoch [10/50], Loss: 3.6718, Accuracy: 13.37%
Epoch [11/50], Loss: 3.6150, Accuracy: 13.37%
Epoch [12/50], Loss: 3.5596, Accuracy: 14.53%
Epoch [13/50], Loss: 3.5253, Accuracy: 14.82%
Epoch [14/50], Loss: 3.4618, Accuracy: 16.45%
Epoch [15/50], Loss: 3.4349, Accuracy: 17.32%
Epoch [16/50], Loss: 3.3774, Accuracy: 18.35%
Epoch [17/50], Loss: 3.3397, Accuracy: 19.05%
Epoch [18/50], Loss: 3.2759, Accuracy: 20.27%
Epoch [19/50], Loss: 3.2402, Accuracy: 19.93%
Epoch [20/50], Loss: 3.1743, Accuracy: 22.07%
Epoch [21/50], Loss: 3.1356, Accuracy: 21.97%
Epoch [22/50], Loss

In [None]:
print("Pretraining SSL Model...")
ssl_encoder = pretrain_simclr(SimCLR(models.resnet18()), unlabeled_loader, epochs=SSL_EPOCHS, lr=SSL_LEARNING_RATE, tau=TAU)

Pretraining SSL Model...
Epoch [1/100], Loss: 1.9321
Epoch [2/100], Loss: 1.2230
Epoch [3/100], Loss: 0.9996
Epoch [4/100], Loss: 0.8623
Epoch [5/100], Loss: 0.7896
Epoch [6/100], Loss: 0.7269
Epoch [7/100], Loss: 0.6884


In [None]:
print("Fine-tuning SSL Model...")
ssl_model = train_supervised()

In [None]:
# 모델 평가 함수
def evaluate_model(model, dataloader):
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)

            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    accuracy = 100 * correct / total
    return accuracy

# Baseline 모델 평가
baseline_acc = evaluate_model(baseline_model, test_loader)

# SSL 모델 평가
ssl_acc = evaluate_model(ssl_model, test_loader)

# 결과 출력
print(f"Baseline Model Test Accuracy: {baseline_acc:.2f}%")
print(f"SSL Model Test Accuracy: {ssl_acc:.2f}%")

## **분석 및 고찰**

### **모델 정의**

- **Baseline 모델**: 주어진 ResNet18 모델을 사용하여 10% labeled data만으로 학습하였습니다.

- **Self-Supervised Learning**: SimCLR 방식을 사용하여 contrastive pretraining을 수행한 후, 10% labeled data로 fine-tuning 하였습니다.

### **데이터셋 준비 (CIFAR-100)**
- CIFAR-100은 100개의 클래스를 포함하며, 각 클래스당 600개의 샘플이 존재합니다.
- 각 클래스에서 10%에 해당하는 60개 샘플만 선택하여 **labeled dataset**을 구성합니다.
- 나머지 540개 샘플은 **unlabeled dataset**으로 저장합니다.
- 10% labeled data는 supervised learning과 fine-tuning에 사용했습니다.
- 90% unlabeled data는 contrastive learning을 통해 pretraining에 활용했습니다.

### **Self-Supervised Learning**
- SimCLR 방식으로 **Contrastive Learning**을 수행하여 ResNet18의 Encoder를 Pretraining 하였습니다.

### **모델 학습 결과**
충분한 epoch를 설정하지 않아 train set에 대해서도 완전히 학습하지 못하는 결과를 초래했습니다. 하지만, 성능을 올리는 과제가 아닌 동일한 환경에서 self-supervised learning의 성능을 평가하기 위한 과제이기 때문에 추가적인 학습을 진행하진 않았습니다.
