In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets import CIFAR10

In [None]:
class DualAugment:
    def __init__(self):
        self.aug = transforms.Compose([
            transforms.RandomResizedCrop(32, scale=(0.2, 1.0)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomApply([transforms.ColorJitter(0.8,0.8,0.8,0.2)], p=0.8),
            transforms.RandomGrayscale(p=0.2),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.4914,0.4822,0.4465],
                                 std=[0.2023,0.1994,0.2010]),
        ])
    def __call__(self, x):
        return self.aug(x), self.aug(x)

train_dataset = CIFAR10(root='./data', train=True, download=True,
                        transform=DualAugment())

train_loader = DataLoader(train_dataset, batch_size=512, shuffle=True,
                          num_workers=2, pin_memory=True, drop_last=True)


In [None]:
class BasicBlock(nn.Module):
    def __init__(self, in_c, out_c, stride=1):
        super().__init__()
        self.conv1 = nn.Conv2d(in_c, out_c, 3, stride, 1, bias=False)
        self.bn1   = nn.BatchNorm2d(out_c)
        self.conv2 = nn.Conv2d(out_c, out_c, 3, 1, 1, bias=False)
        self.bn2   = nn.BatchNorm2d(out_c)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_c != out_c:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_c, out_c, 1, stride, bias=False),
                nn.BatchNorm2d(out_c)
            )
    def forward(self, x):
        out = torch.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        return torch.relu(out)

In [None]:
class ResNet18(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 64, 3, 1, 1, bias=False)
        self.bn1   = nn.BatchNorm2d(64)
        self.l1 = nn.Sequential(BasicBlock(64,64), BasicBlock(64,64))
        self.l2 = nn.Sequential(BasicBlock(64,128,2), BasicBlock(128,128))
        self.l3 = nn.Sequential(BasicBlock(128,256,2), BasicBlock(256,256))
        self.l4 = nn.Sequential(BasicBlock(256,512,2), BasicBlock(512,512))
    def forward(self, x):
        x = torch.relu(self.bn1(self.conv1(x)))
        x = self.l1(x); x = self.l2(x); x = self.l3(x); x = self.l4(x)
        x = nn.functional.avg_pool2d(x, 4)
        x = x.view(x.size(0), -1)
        return x

In [None]:
class SimSiam(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone
        self.projection = nn.Sequential(
            nn.Linear(512, 2048, bias=False), nn.BatchNorm1d(2048), nn.ReLU(),
            nn.Linear(2048, 2048, bias=False), nn.BatchNorm1d(2048), nn.ReLU(),
            nn.Linear(2048, 2048, bias=False), nn.BatchNorm1d(2048, affine=False)
        )
        self.predictor = nn.Sequential(
            nn.Linear(2048, 512, bias=False), nn.BatchNorm1d(512), nn.ReLU(),
            nn.Linear(512, 2048)
        )
    def forward(self, x1, x2):
        z1, z2 = self.projection(self.backbone(x1)), self.projection(self.backbone(x2))
        p1, p2 = self.predictor(z1), self.predictor(z2)
        return p1, p2, z1.detach(), z2.detach()

def loss_fn(p, z):
    p = nn.functional.normalize(p, dim=1)
    z = nn.functional.normalize(z, dim=1)
    return -(p * z).sum(dim=1).mean()


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

backbone = ResNet18().to(device)
model = SimSiam(backbone).to(device)
optimizer = optim.SGD(model.parameters(), lr=0.06, momentum=0.9, weight_decay=5e-4)


print("\nStarting SimSiam pre-training ")
model.train()
for epoch in range(1, 101):
    total_loss = 0.0
    for (view1, view2), _ in train_loader:
        view1, view2 = view1.to(device), view2.to(device)
        p1, p2, z1, z2 = model(view1, view2)
        loss = 0.5 * (loss_fn(p1, z2) + loss_fn(p2, z1))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch:3d}/100 | Loss: {avg_loss:.4f}")

print("Pre-training finished!\n")


In [None]:
train_sup = CIFAR10('./data', train=True, download=True,
                    transform=transforms.Compose([
                        transforms.RandomCrop(32, padding=4),
                        transforms.RandomHorizontalFlip(),
                        transforms.ToTensor(),
                        transforms.Normalize((0.4914,0.4822,0.4465),(0.2023,0.1994,0.2010))
                    ]))
test_sup = CIFAR10('./data', train=False, download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.4914,0.4822,0.4465),(0.2023,0.1994,0.2010))
                   ]))

train_loader_sup = DataLoader(train_sup, batch_size=512, shuffle=True, num_workers=2, pin_memory=True)
test_loader_sup  = DataLoader(test_sup,  batch_size=512, shuffle=False, num_workers=2, pin_memory=True)

backbone.eval()
for p in backbone.parameters():
    p.requires_grad = False

classifier = nn.Linear(512, 10).to(device)
opt_clf = optim.SGD(classifier.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
criterion = nn.CrossEntropyLoss()

print("Training linear classifier")
for epoch in range(20):
    classifier.train()
    for x, y in train_loader_sup:
        x, y = x.to(device), y.to(device)
        loss = criterion(classifier(backbone(x)), y)
        opt_clf.zero_grad()
        loss.backward()
        opt_clf.step()
    print(f"Linear epoch {epoch+1}/20 done")

In [None]:
classifier.eval()
correct = total = 0
with torch.no_grad():
    for x, y in test_loader_sup:
        x, y = x.to(device), y.to(device)
        preds = classifier(backbone(x)).argmax(1)
        correct += (preds == y).sum().item()
        total += y.size(0)

acc = 100 * correct / total
print(f"\nFINAL TEST ACCURACY: {acc:.2f}%")
print("You just completed your first self-supervised learning project!")