In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import os
import numpy as np
import random
import time
import sys
import matplotlib.pyplot as plt

from PIL import Image
from torchvision import datasets , transforms
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import CosineAnnealingLR

Random seed 고정

In [2]:
seed = 40
deterministic = True

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
if deterministic:
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

google drive mount

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


데이터셋 가져오기

In [5]:
# 데이터를 저장할 폴더 생성
data_root = '/content/drive/MyDrive/imagesnet_data'
os.makedirs(data_root, exist_ok=True)

In [None]:
# 4. uploader.py 실행
!python /content/drive/MyDrive/ImageNet-Datasets-Downloader-master/downloader.py \
    -data_root /content/drive/MyDrive/imagesnet_data \
    -number_of_classes 20 \
    -images_per_class 20

손상된 이미지 자동 삭제

In [171]:
# 데이터셋 경로 설정
root_dir = "/content/drive/MyDrive/imagesnet_data/imagenet_images"

# 손상된 이미지 탐색 및 삭제
for subdir, _, files in os.walk(root_dir):
    for file in files:
        file_path = os.path.join(subdir, file)
        try:
            # 이미지를 열어보기 (손상된 경우 예외 발생)
            with Image.open(file_path) as img:
                img.verify()  # 이미지가 유효한지 검증
        except (IOError, OSError, Image.DecompressionBombError) as e:
            # 손상된 이미지 발견 시 삭제
            print(f"Deleting corrupted image: {file_path}, Error: {e}")
            os.remove(file_path)

KeyboardInterrupt: 

두개의 관점(논문에 나온 이미지처럼)을 얻을 것이므로 단일 이미지에 대해 augmentation을 두번씩 진행한다

ResNet50이 backbone으로 사용

In [7]:
!pip install timm
import timm



In [160]:
resnet_1 = timm.create_model('resnet50', pretrained=True)
resnet_2 = timm.create_model('resnet50', pretrained=True)

최종 레이어(backbone(=resnet50)의 encoder로 사용된다) 제거

In [161]:
del resnet_1.fc
del resnet_2.fc

BN이 포함된 three layer MLP를 사용한다

In [162]:
resnet_1.fc = nn.Sequential(
    nn.Linear(2048, 2048),
    nn.BatchNorm1d(2048),
    nn.ReLU(),
    nn.Linear(2048,2048),
    nn.BatchNorm1d(2048),
    nn.ReLU(),
    nn.Linear(2048,2048),
    nn.BatchNorm1d(2048),
    nn.ReLU()
)

resnet_2.fc = nn.Sequential(
    nn.Linear(2048, 2048),
    nn.BatchNorm1d(2048),
    nn.ReLU(),
    nn.Linear(2048,2048),
    nn.BatchNorm1d(2048),
    nn.ReLU(),
    nn.Linear(2048,2048),
    nn.BatchNorm1d(2048),
    nn.ReLU()
)

model parameter freeze , final block non freeze

In [163]:
for param in resnet_1.parameters():
  param.requires_grad = False
for param in resnet_1.fc.parameters():
  param.requires_grad = True

for param in resnet_2.parameters():
  param.requires_grad = False
for param in resnet_2.fc.parameters():
  param.requires_grad = True

resnet50 데이터 정규화 리스트


In [164]:
mean = [0.485,0.456,0.406]
std = [0.229,0.224,0.225]

이미지 전처리

In [165]:
# 이미지 변환 파이프라인 정의
transform = transforms.Compose([
    transforms.Resize((256, 256)),  # 먼저 크기 조정
    transforms.RandomResizedCrop(224),  # 그 후 임의의 크기로 자르기
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),  # 색상 변화
    transforms.RandomGrayscale(p=0.2),  # 20% 확률로 흑백 변환
    transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 2.0)),  # 가우시안 블러
    transforms.RandomHorizontalFlip(p=0.5),  # 50% 확률로 좌우 뒤집기
    transforms.ToTensor(),  # 마지막에 텐서 변환
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 정규화
])

In [166]:
aug1 = transforms.Compose([
    transforms.Resize((672, 224)),  # 크기 조정
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1),  # 색상 변화
    transforms.RandomGrayscale(p=0.2),  # 20% 확률로 흑백 변환
    transforms.RandomResizedCrop(224),  # 임의 크기로 자르기
    transforms.RandomHorizontalFlip(p=0.5),  # 좌우 뒤집기
    transforms.ToTensor(),  # 마지막에 텐서로 변환
    transforms.Normalize(mean=mean, std=std)  # 정규화
])

In [167]:
aug2 = transforms.Compose([
    transforms.Resize((672, 224)),  # 크기 조정
    transforms.GaussianBlur(kernel_size=(5, 5), sigma=(0.1, 5)),  # 가우시안 블러
    transforms.RandomResizedCrop(224),  # 임의 크기 자르기
    transforms.ToTensor(),  # 마지막에 텐서로 변환
    transforms.Normalize(mean=mean, std=std)  # 정규화
])

훈련 loop

optimizer : LARS

---


batch size : 4096

---


momentum : 0.9

---


weight_decay : None

---


learning rate : base 0.03 , cosine decay learning rate scheduler

---


epoch : 100

optimizer 설치

In [116]:
!pip install torchlars
from torchlars import LARS



data_loading

In [168]:
dataset = datasets.ImageFolder(root=data_root, transform=transform)
dataloader = DataLoader(dataset, batch_size=1024, shuffle=True)

proposed_dataset = datasets.ImageFolder(root=data_root, transform=False)
proposed_dataloader = DataLoader(proposed_dataset, batch_size=1024, shuffle=True)

Linear evaluation

In [169]:
base_optimizer_linear = torch.optim.Adam(resnet_1.fc.parameters(), lr=0.03)
optimizer_linear = LARS(optimizer=base_optimizer_linear, trust_coef=0.001)
scheduler_linear = CosineAnnealingLR(optimizer_linear, T_max=100, eta_min=0.0003)

In [None]:
cnt = 0
loss_history = []
test_loss_history = []
ce_loss = nn.CrossEntropyLoss()
resnet_1 = resnet_1.to(device)

resnet_1.train()
for epoch in range(100):
  for images, labels in dataloader:

    images = images.to(device)
    labels = labels.to(device)

    images = images.reshape(-1,3,224,224)
    labels = labels.long()

    output = resnet_1(images)
    loss = ce_loss(output, labels)

    # Backward and optimize
    optimizer_linear.zero_grad()
    loss.backward()
    optimizer_linear.step()

    # loss history append
    loss_history.append(loss.item())

  scheduler_linear.step()
  print("epoch : {} , loss : {}".format(epoch, loss))

Matrix-SSL

In [None]:
base_optimizer_propose = torch.optim.Adam(resnet_2.fc.parameters(), lr=0.03)
optimizer_propose = LARS(optimizer=base_optimizer_propose, trust_coef=0.001)
scheduler_propose = CosineAnnealingLR(optimizer_propose, T_max=100, eta_min=0.0003)

align_loss = -tr(c(z_1,z_2)) + gamma*MCE(c(z_1,z_1),c(z_2,z_2))

---


uniform_loss = MCE((1/d)*identity_d , c(z_1,z_2))

In [None]:
cnt = 0
loss_history = []
test_loss_history = []
ce_loss = nn.CrossEntropyLoss()
resnet_2 = resnet_2.to(device)

resnet_2.train()
for epoch in range(100):
  for images, labels in proposed_dataloader:

    images = images.to(device)
    labels = labels.to(device)
    labels = labels.long()

    z_1 = aug1(images).reshape(-1,3,224,224)
    z_2 = aug2(images).reshape(-1,3,224,224)

    output_1 = resnet_2(z_1)
    output_2 = resnet_2(z_2)

    # Matrix의 대각합
    # MCE(p,q) = tr(-plog(q) + q)

    #loss 함수 정의하기
    align_loss = ...
    uniform_loss = ...

    loss = align_loss + uniform_loss

    # Backward and optimize
    optimizer_linear.zero_grad()
    loss.backward()
    optimizer_linear.step()

    # loss history append
    loss_history.append(loss.item())

  scheduler_linear.step()
  print("epoch : {} , loss : {}".format(epoch, loss))