In [None]:
import torch
import os
from torchvision import datasets
import torchvision.transforms as transforms
import neptune.new as neptune
import torch.optim as optim
import time
import torch.nn as nn
import math
import simsiam.loader
from simsiam.builder import SimSiam
from simsiam.criterion import SimSiamLoss
from simsiam.validation import KNNValidation
from torch.backends import cudnn

Hiper-parámetros a definir por el usuario

In [None]:
arch = 'mobilenet_v2' #Opciones: resnet34, mobilenet_v2, alexnet, vgg13 y squeezenet1_1
dataset = 'ImageWoof' #Nombre del dataset
train_path = 'ImageWoof/train' #Path del dataset con las imágenes de train
val_path = 'ImageWoof/val' #Path del dataset con las imágenes de val
model_path = 'simsiam/Unsupervised_Training_Checkpoints/ImageWoof/Mobilenet/' #Path donde se querrá guardar el modelo
batch_size = 32
epochs = 800 #Número de épocas
lr = 0.05 #Learning rate base
weight_decay = 0.0001
momentum = 0.9
gpu = 0 #Número de la gpu donde ejecutar la red
nombre = 'ImageWoofSimSiamMobilenetV2_800epochs' #Nombre del experimento
eval_freq = 5 #Frecuencia con la que validar con el clasificador kNN 

Código

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

In [None]:
class ProgressMeter(object):
    def __init__(self, num_batches, meters, prefix=""):
        self.batch_fmtstr = self._get_batch_fmtstr(num_batches)
        self.meters = meters
        self.prefix = prefix

    def display(self, batch):
        entries = [self.prefix + self.batch_fmtstr.format(batch)]
        entries += [str(meter) for meter in self.meters]
        print('\t'.join(entries))

    def _get_batch_fmtstr(self, num_batches):
        num_digits = len(str(num_batches // 1))
        fmt = '{:' + str(num_digits) + 'd}'
        return '[' + fmt + '/' + fmt.format(num_batches) + ']'

In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self, name, fmt=':f'):
        self.name = name
        self.fmt = fmt
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

    def __str__(self):
        fmtstr = '{name} {val' + self.fmt + '} ({avg' + self.fmt + '})'
        return fmtstr.format(**self.__dict__)

In [None]:
def adjust_learning_rate(optimizer, init_lr, epoch, n_epochs):
    cur_lr = init_lr * 0.5 * (1. + math.cos(math.pi * epoch / n_epochs))
    for param_group in optimizer.param_groups:
        if 'fix_lr' in param_group and param_group['fix_lr']:
            param_group['lr'] = init_lr
        else:
            param_group['lr'] = cur_lr

In [None]:
def train(train_loader, model, criterion, optimizer, epoch):
    batch_time = AverageMeter('Time', ':6.3f')
    data_time = AverageMeter('Data', ':6.3f')
    losses = AverageMeter('Loss', ':.4e')
    progress = ProgressMeter(
        len(train_loader),
        [batch_time, data_time, losses],
        prefix="Epoch: [{}]".format(epoch))

    # switch to train mode
    model.train()
    end = time.time()
    for i, (images, _) in enumerate(train_loader):
        data_time.update(time.time() - end)
        
        images[0] = images[0].cuda(0, non_blocking=True)
        images[1] = images[1].cuda(0, non_blocking=True)

        # compute output
        z1, z2, p1, p2 = model(x1=images[0], x2=images[1])
        loss = criterion(z1, z2, p1, p2)
        losses.update(loss.item(), images[0].size(0))
        # compute gradient and do SGD step
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()
        
        if i % 30 == 0:
            progress.display(i)

    return losses.avg

In [None]:
transform_train = transforms.Compose([
     transforms.RandomResizedCrop(224, scale=(0.2, 1.)),
     transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
     transforms.RandomGrayscale(p=0.2),
     transforms.RandomApply([simsiam.loader.GaussianBlur([.1, 2.])], p=0.5),
     transforms.RandomHorizontalFlip(),
     transforms.ToTensor(),
     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

In [None]:
train_set = datasets.ImageFolder(train_path, simsiam.loader.TwoCropsTransform(transform_train))
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True, drop_last=True)

In [None]:
model = SimSiam(arch)
init_lr = lr*batch_size / 256
optim_params = [{'params': model.encoder.parameters(), 'fix_lr': False},
                {'params': model.predictor.parameters(), 'fix_lr': True}]
optimizer = optim.SGD(optim_params, init_lr, momentum=momentum, weight_decay=weight_decay)
criterion = SimSiamLoss()
if gpu is not None:
    torch.cuda.set_device(gpu)
    model = model.cuda(gpu)
    criterion = criterion.cuda(gpu)
    cudnn.benchmark = True
print(model)

In [None]:
start_epoch = 0
best_acc = 0.0
validation = KNNValidation(batch_size, train_path, val_path, model)

for epoch in range(start_epoch, epochs):
    adjust_learning_rate(optimizer, init_lr, epoch, epochs)
    train_loss = train(train_loader, model, criterion, optimizer, epoch)
    if (epoch+1) % eval_freq == 0:
        print("Validando...")
        val_top1_acc = validation.eval()
        print('Top1: {}'.format(val_top1_acc))
        # guardamos el mejor modelo
        if val_top1_acc > best_acc:
            best_acc = val_top1_acc
            name = 'best.pth'
            torch.save(model, model_path + name)
        name = 'checkpoint_{}.pth'.format(epoch+1)
        torch.save(model, model_path + name)
print('Mejor accuracy:', best_acc)