In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader

import torchvision
import torchvision.transforms as T
from torchvision.models import resnet18

import numpy as np
from tqdm import tqdm

import os
import time


BATCH_SIZE = 64
EPOCHS = 100
TEMPERATURE = 0.1  
LR = 3e-4         
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
MESSAGE = "test_temperature" 

# Transformacije kroz koje prolazi svaka slika dataseta
transform = T.Compose([
    T.RandomResizedCrop(32),
    T.RandomHorizontalFlip(),
    T.RandomApply([T.ColorJitter(0.5, 0.5, 0.5, 0.1)], p=0.8),
    T.RandomGrayscale(p=0.2),
    T.ToTensor()
])

# Klasa koja ce od jedne slike iz dataseta da vrati dve njene augmentacije.
# Bitno je primetiti da iako CIFAR10 ima labele mi cemo ih ignorisati 
# prilikom ucenja modela SimCLR (jer spada u self-supervised learning).
class SimCLRDataset(torch.utils.data.Dataset):

    def __init__(self, dataset):
        self.dataset = dataset

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        x, _ = self.dataset[index]
        return transform(x), transform(x)

# Priprema dataseta
cifar = torchvision.datasets.CIFAR10(root = "./data", train = True, download = True)
cifar_small = torch.utils.data.Subset(cifar, np.arange(10000))
dataset = SimCLRDataset(cifar_small)
dataload = DataLoader(dataset, batch_size = BATCH_SIZE, shuffle = True, drop_last = True, num_workers = 4, pin_memory = True)

# Pravimo klasu modela za SimCLR. 
# Sastoji se od enkodera za koji smo uzeli resnet18 mrezu bez zadnjeg FC sloja (jer ga ne ucimo klasifikaciju).
# Na izlaz iz enkodera se nadovezuje projekcioni sloj koji smanjuje dimenziju izlaza enkodera,
# ali takodje primenjuje i nelinearnu transformaciju nad podacima.
class SimCLRModel(nn.Module):

    def __init__(self):
        super().__init__()

        base = resnet18(pretrained = False)

        base.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        base.maxpool = nn.Identity()
        
        self.encoder = nn.Sequential(*list(base.children())[:-1])
        self.projection = nn.Sequential(
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, 64)
        )

    def forward(self, x):
        h = self.encoder(x).squeeze()
        z = self.projection(h)
        return z

# Definisemo model i optimizator. 
# Bitno da model i podatke s kojima radi cuvamo na istom mestu
# zato im svima eksplicitno prosledjujemo DEVICE.
model = SimCLRModel().to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr = LR)

# Funkcija koja racuna gubitak SimCLR algrotima zasnovana na NT-Xent loss.
# zi i zj su izlazi modela za dva susedna batcha. 
# sim_matrix je matrica slicnosti izmedju slika iz oba batcha. Svako polje Si,j sadrzi kosinusnu slicnost izmedju vektora slika i i j.
# Posto je svaki vektor slican sam sebi, na dijagonali ove matrice ce se nalaziti jedinice,
# ali to nije podatak koji treba da koristimo za racun zato ga stavljamo na dovoljno mali broj da ne utice na rezultat.
# positives je vektor koji za svaki indeks slike prikazuje na kom se indeksu nalazi pozitivni par te slike
def calculate_loss(zi, zj, temperature):

    n = zi.size(0)
    z = torch.cat([zi,zj], dim=0)
    z = F.normalize(z, dim=1)

    sim_matrix = torch.matmul(z, z.T)
    sim_matrix = sim_matrix / temperature

    mask = torch.eye(2*n, dtype = torch.bool, device = z.device)
    sim_matrix = sim_matrix.masked_fill(mask, -9999999)

    positives = torch.cat([torch.arange(n, 2*n), torch.arange(0, n)]).to(DEVICE)

    loss = F.cross_entropy(sim_matrix, positives)
    return loss
    

# Pravljenje log fajla za cuvanje setup-a i pracenje rezultata
save_dir = "saved_models"
os.makedirs(save_dir, exist_ok = True)

log_path = os.path.join(save_dir, f"log_{MESSAGE}_{time.strftime('%Y%m%d-%H%M%S')}.txt")
with open(log_path, "w") as f:
    f.write("SimCLR Training Log\n")
    f.write(f"MESSAGE: {MESSAGE}\n")
    f.write(f"BATCH_SIZE: {BATCH_SIZE}\n")
    f.write(f"EPOCHS: {EPOCHS}\n")
    f.write(f"TEMPERATURE: {TEMPERATURE}\n")
    f.write(f"LEARNING_RATE: {LR}\n")
    f.write(f"DEVICE: {DEVICE}\n")
    f.write(f"MODEL_SAVE_DIR: {save_dir}\n")
    f.write("-" * 40 + "\n\n")

# Pripremamo i zapocinjemo trening modela
# Cuvamo istreninarni model iz zadnje epohe
model.train()
    
for epoch in range(EPOCHS):

    total_loss = 0
    for (xi, xj) in tqdm(dataload):

        xi, xj = xi.to(DEVICE), xj.to(DEVICE)
        zi = model(xi)
        zj = model(xj)

        loss = calculate_loss(zi, zj, TEMPERATURE)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    print(f"epoch: {epoch+1} loss: {total_loss/len(dataload):.4f}")

    if epoch == EPOCHS - 1:
        timestamp = time.strftime("%Y%m%d-%H%M%S")
        model_path = os.path.join(save_dir, f"simclr_{MESSAGE}_{timestamp}.pt")
        torch.save(model.state_dict(), model_path)

    avg_loss = total_loss / len(dataload)
    log_line = f"Epoch {epoch+1}: loss = {avg_loss:.4f}\n"

    with open(log_path, "a") as f:
        f.write(log_line)



##### LINEARNA EVALUACIJA #######
# Ovo nije deo SimCLR algortima vec test rada na jednoj od mogucih njegovih primena.
# Konkretno nauceni model cemo iskoristiti na problem klasifikacije.
# Enkoder modela zamrzavamo tako da ne uci nista vise, a projekcionu glavu odbacujemo.
# Pravimo jednu linearnu mrezu koja spaja samo izlaz iz enkodera sa klasama.
# Potreban je uraditi i trening te mreze ali je to jednostavniji i vremenski dosta kraci proces jer su i dimenzije mnogo manje.

# Ucitavanje slika i pravljenje setova za trening i test linearne evaluacije.
cifar_train = torchvision.datasets.CIFAR10(root = "./data", train = True, download = True, transform = T.ToTensor())
cifar_test = torchvision.datasets.CIFAR10(root = "./data", train = False, download = True, transform = T.ToTensor())

train_load = DataLoader(cifar_train, batch_size = BATCH_SIZE, shuffle = True, num_workers = 4, pin_memory = True)
test_load = DataLoader(cifar_test, batch_size = BATCH_SIZE, shuffle = False, num_workers = 2, pin_memory = True)

# Zamrzavanje parametara modela naucenog SimCLR algoritmom.
for param in model.encoder.parameters():
    param.requires_grad = False
    
# Mreza linearne klasifikacije.
class LinearClassifier(nn.Module):

    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(512, 10)

    def forward(self, x):
        return self.fc(x)

# Definisanje modela klasifikatora, optimizatora za njega, kao i funkcije koja ce racunati loss 
classifier = LinearClassifier().to(DEVICE)
optimizer_cl = torch.optim.Adam(classifier.parameters(), lr=1e-3)
loss_cl = nn.CrossEntropyLoss()

# Trening klasifikatora.
# Dovoljno je da koristimo manji broj epoha jer vrednosti rano krenu da konvergiraju oko slicne vrednosti.
classifier.train()
for epoch in range(50):
    
    total_loss = 0
    for x, y in train_load:
        x, y = x.to(DEVICE), y.to(DEVICE)
        h = model.encoder(x).squeeze()
        pred = classifier(h)
        loss = loss_cl(pred, y)

        optimizer_cl.zero_grad()
        loss.backward()
        optimizer_cl.step()
        total_loss += loss.item()

    print(f"Linear epoch {epoch+1}, loss: {total_loss / len(train_load):.4f}")
    linear_log = f"Linear Epoch {epoch+1}: loss = {total_loss / len(train_load):.4f}\n"
    with open(log_path, "a") as f:
        f.write(linear_log)


# Konacna evaluacija koja radi sa test podacima.
# Kao rezultat dobijamo konacan broj u procentima uspesnosti rada algoritma.
def evaluate(model, classifier, dataloader):
    model.eval()
    classifier.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            h = model.encoder(x).squeeze()
            out = classifier(h)
            _, predicted = out.max(1)
            correct += (predicted == y).sum().item()
            total += y.size(0)

    print(f"Test accuracy: {correct / total * 100:.2f}%")
    test_log = f"Linear Evaluation Test Accuracy: {correct / total * 100:.2f}%\n"
    with open(log_path, "a") as f:
        f.write(test_log)


evaluate(model, classifier, test_load)