<a href="https://colab.research.google.com/github/EugeneLogvinovsky/ITHillel/blob/main/Unsupervised_Pre_training_with_Denoising_AutoEncoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import torch

from torchvision.datasets import STL10
from torchvision.transforms import ToTensor, Normalize, Compose, RandomHorizontalFlip, RandomCrop
from torch.utils.data import DataLoader

from torch import nn, optim

In [3]:
train_ds = STL10('.', split='train', folds=None, transform=ToTensor(), download=True)
X = torch.stack([train_ds[i][0] for i in range(len(train_ds))], 1).reshape(3, -1)
stl10_mean, stl10_std = X.mean(1), X.std(1)

train_transforms = Compose([
    RandomHorizontalFlip(),
    RandomCrop(size=32, padding=2),
    ToTensor(),
    Normalize(mean=stl10_mean, std=stl10_std)])
train_ds = STL10('.', split='train', folds=None, transform=train_transforms, download=True)
train_dl = DataLoader(train_ds, batch_size=512, shuffle=True)

val_transforms = Compose([ToTensor(), Normalize(mean=stl10_mean, std=stl10_std)])
val_ds = STL10('.', split='test', folds=None, transform=val_transforms, download=True)
val_dl = DataLoader(val_ds, batch_size=1024)

pretrain_ds = STL10('.', split='unlabeled', folds=None, transform=train_transforms, download=True)
pretrain_dl = DataLoader(pretrain_ds, batch_size=512, shuffle=True)

print(f'Dataset size: train={len(train_ds)}, val={len(val_ds)}, pretrain={len(pretrain_ds)}')

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified
Dataset size: train=5000, val=8000, pretrain=100000


In [4]:
# 3072 -> 256 (C=12)
encoder = nn.Sequential(
    # 3x32x32 = 3072
    nn.Conv2d(3, 32, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(32),
    nn.MaxPool2d(2, 2),

    # 32x16x16
    nn.Conv2d(32, 64, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(64),
    nn.Conv2d(64, 64, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(64),
    nn.MaxPool2d(2, 2),

    # 64x8x8
    nn.Conv2d(64, 128, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(128),
    nn.Conv2d(128, 128, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(128),
    nn.MaxPool2d(2, 2),

    # 128x4x4
    nn.Conv2d(128, 128, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(128),
    nn.Conv2d(128, 64, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.BatchNorm2d(64),
    nn.AvgPool2d(2, 2),

    # 64x2x2
    # nn.Flatten()
    # 64*2*2 = 256
)

decoder = nn.Sequential(
    # 64x2x2
    nn.ConvTranspose2d(64, 128, kernel_size=3, stride=2),
    nn.ReLU(),
    nn.ConvTranspose2d(128, 128, kernel_size=3, stride=2),
    nn.ReLU(),   

    # 128x4x4
    nn.ConvTranspose2d(128, 128, kernel_size=3, stride=2),
    nn.ReLU(),
    nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2),
    nn.ReLU(),
    # 64x8x8
    nn.ConvTranspose2d(64, 64, kernel_size=3, stride=2),
    nn.ReLU(),
    nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2),
    nn.ReLU(),
    # 32x16x16
    nn.Conv2d(32, 3, kernel_size=3, padding=1),
    nn.Sigmoid()
    # 3x32x32
)

model = nn.Sequential(encoder, decoder)
loss_fn = nn.CrossEntropyLoss()
#loss = loss_fn(model(X), X)
#loss.backward()

In [5]:
def define_cnn_extractor():
    return nn.Sequential(
        model
    )

In [6]:
def train(train_dl, val_dl, forward, opt):
    loss_fn = nn.CrossEntropyLoss()

    for epoch_ind in range(10):
        train_loss, train_acc = [], []
        for (X, Y) in pretrain_dl:
            opt.zero_grad()
            y_pred = forward(X)
            loss = loss_fn(y_pred, Y)
            loss.backward()
            opt.step()
            train_loss.append(loss.cpu().detach())
            train_acc.append(torch.mean((y_pred.argmax(1) == Y).float()).item())

        print(f"T[{epoch_ind}]: loss={np.mean(pretrain_loss):.6f}\tacc={np.mean(pretrain_acc):.4f}")

        val_loss, val_acc = [], []
        for (X, Y) in val_dl:
            y_pred = forward(X)
            loss = loss_fn(y_pred, Y)
            val_loss.append(loss.cpu().detach())
            val_acc.append(torch.mean((y_pred.argmax(1) == Y).float()).item())

        print(f"V[{epoch_ind}]: loss={np.mean(val_loss):.6f}\tacc={np.mean(val_acc):.4f}\n")

In [7]:
# calculate STL10 mean and std

X = torch.stack([train_ds[i][0] for i in range(len(train_ds))], 1).reshape(3, -1)
stl10_mean, stl10_std = X.mean(1), X.std(1)

In [None]:
# define STL10 model and train it from scratch (pretrain)
CNN_extractor = define_cnn_extractor()
STL10_classifier = nn.Linear(256, 10)
STL10_model = nn.Sequential(CNN_extractor, STL10_classifier)
print('Train STL10 model from scratch (unlabeled)')
opt = optim.Adam(params=STL10_model.parameters(), lr=1e-3)
train(pretrain_dl, val_dl, STL10_model, opt)

Train STL10 model from scratch (unlabeled)


In [None]:
# define STL10 model with pre-trained cnn extractor
#CNN_extractor = define_cnn_extractor()
STL10_classifier = nn.Linear(256, 100)
STL10_model = nn.Sequential(CNN_extractor, STL10_classifier)
print('Train STL10 model pre-trained model (labeled)')
opt = optim.Adam(params=STL10_model.parameters(), lr=1e-3)
train(train_dl, val_dl, STL10_model, opt)


