## Assignment 6
* Write a Variational Convolutional AutoEncoder on CFAR10 + Add denoising to the model
* Use the latent space for CFAR10 classification

In [None]:
import torch
import torch.nn as nn
import torch.utils as utils
import torchvision.datasets as dset
import torchvision.transforms as transforms

import torchvision
import torchvision.transforms as transforms

import matplotlib.pyplot as plt
import os
%matplotlib inline

In [None]:
# Set Hyperparameters

epoch = 10
batch_size =100
learning_rate = 0.0005

# Normalize training data (zero mean and variance of 1)
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10('./data', train=True, download=True, transform=transforms.ToTensor())
testset = torchvision.datasets.CIFAR10('./data', train=False, download=True, transform=transforms.ToTensor())

#trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)

#testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
test_loader = torch.utils.data.DataLoader(testset, batch_size=100, shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


In [None]:
# Encoder 
# torch.nn.Conv2d(in_channels, out_channels, kernel_size,
#                 stride=1, padding=0, dilation=1,
#                 groups=1, bias=True)
# batch x 1 x 28 x 28 -> batch x 512

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder,self).__init__()
        self.layer1 = nn.Sequential(
                        # input_channel = 1 -> gray scale!
                        # output_channel = number of filters!
                        nn.Conv2d(3,32,3,padding=1),   # batch x 16 x 28 x 28
                        nn.ReLU(),
                        nn.BatchNorm2d(32),
                        nn.Conv2d(32,32,3,padding=1),   # batch x 16 x 28 x 28
                        nn.ReLU(),
                        nn.BatchNorm2d(32),
                        nn.Conv2d(32,64,3,padding=1),  # batch x 32 x 28 x 28
                        nn.ReLU(),
                        nn.BatchNorm2d(64),
                        nn.Conv2d(64,64,3,padding=1),  # batch x 32 x 28 x 28
                        nn.ReLU(),
                        nn.BatchNorm2d(64),
                        nn.MaxPool2d(2,2)   # batch x 64 x 14 x 14
        )
        self.layer2 = nn.Sequential(
                        nn.Conv2d(64,128,3,padding=1),  # batch x 64 x 14 x 14
                        nn.ReLU(),
                        nn.BatchNorm2d(128),
                        nn.Conv2d(128,128,3,padding=1),  # batch x 64 x 14 x 14
                        nn.ReLU(),
                        nn.BatchNorm2d(128),
                        nn.MaxPool2d(2,2),
                        nn.Conv2d(128,3,3,padding=1),  # batch x 64 x 7 x 7
                        nn.ReLU()
        )
        
                
    def forward(self,x):
        out = self.layer1(x)
        out = self.layer2(out)
        #out = out.view(batch_size, -1)
        return out
    
encoder = Encoder().cuda()

In [None]:
inputSize = 3
outputSize = 3

class HiddenLayer(nn.Module):
    def __init__(self):
        super(HiddenLayer, self).__init__()
         
        # Convolution 1
        self.cnn1 = nn.Conv2d(in_channels=inputSize, out_channels=512, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.norm1 = nn.BatchNorm2d(512)

        # Convolution 2
        self.cnn2 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.norm2 = nn.BatchNorm2d(512)
        
        # Fully connected 1 (readout)
        self.fc1 = nn.Linear(512 * 8 * 8, outputSize*8*8) 
    
    def forward(self, x):
        # x =  torch.Size([100, 1, 28, 28])
        # Convolution 1
        out = self.cnn1(x) #torch.Size([100, 16, 24, 24])
        out = self.relu1(out) #torch.Size([100, 16, 24, 24])
        out = self.norm1(out)

        # Convolution 2 
        out = self.cnn2(out) #torch.Size([100, 32, 8, 8])
        out = self.relu2(out) #torch.Size([100, 32, 8, 8])
        out = self.norm2(out)

        out = out.view(out.size(0), -1)   #torch.Size([100, 512])
 
        # Linear function (readout)
        out = self.fc1(out)
         
        return out

latent = HiddenLayer().cuda()

In [None]:
# Decoder 
# torch.nn.ConvTranspose2d(in_channels, out_channels, kernel_size,
#                          stride=1, padding=0, output_padding=0,
#                          groups=1, bias=True)
# output_height = (height-1)*stride + kernel_size - 2*padding + output_padding
# batch x 512 -> batch x 1 x 28 x 28

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder,self).__init__()
        self.layer1 = nn.Sequential(
                        nn.ConvTranspose2d(3,128,3,2,1,1),
                        nn.ReLU(),
                        nn.BatchNorm2d(128),
                        nn.ConvTranspose2d(128,128,3,1,1),
                        nn.ReLU(),
                        nn.BatchNorm2d(128),
                        nn.ConvTranspose2d(128,64,3,1,1),
                        nn.ReLU(),
                        nn.BatchNorm2d(64),
                        nn.ConvTranspose2d(64,64,3,1,1),
                        nn.ReLU(),
                        nn.BatchNorm2d(64)
        )
        self.layer2 = nn.Sequential(
                        nn.ConvTranspose2d(64,32,3,1,1),
                        nn.ReLU(),
                        nn.BatchNorm2d(32),
                        nn.ConvTranspose2d(32,32,3,1,1),
                        nn.ReLU(),
                        nn.BatchNorm2d(32),
                        nn.ConvTranspose2d(32,3,3,2,1,1),
                        nn.ReLU()
        )
        
    def forward(self,x):
        out = self.layer1(x)
        out = self.layer2(out)
        return out

decoder = Decoder().cuda()

In [None]:
# Check output of autoencoder
for image,label in train_loader:
    image = image.cuda()
    output = encoder(image)
    output = latent(output)
    output = output.view(batch_size,3,8,8)
    output = decoder(output)
    print(output.size())
    break

torch.Size([100, 3, 32, 32])


In [None]:
# loss func and optimizer
# we compute reconstruction after decoder so use Mean Squared Error
# In order to use multi parameters with one optimizer,
# concat parameters after changing into list

parameters = list(encoder.parameters())+ list(latent.parameters()) + list(decoder.parameters())
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(parameters, lr=learning_rate)

Train Autoencoder 

In [None]:
# train encoder and decoder
# save and load model
if not os.path.exists('./model'):
    os.mkdir('./model')
try:
    encoder, decoder = torch.load('./model/deno_autoencoder3.pkl')
    latent.load_state_dict(torch.load('./model/deno_latent_parameters2.pkl'))
    print("\n--------model restored--------\n")
except:
    print("\n--------model not restored--------\n")
    pass

for i in range(epoch):
    for image,label in train_loader:
        image_n = torch.mul(image+0.25, 0.1 * torch.rand(image.shape[0],3,32,32))
        image = image.cuda()
        image_n = image_n.cuda()
        #label = label.float().cuda()
        optimizer.zero_grad()
        output = encoder(image_n)
        output = latent(output)
        output = output.view(batch_size,3,8,8)
        output = decoder(output)
        loss = loss_func(output,image)
        loss.backward()
        optimizer.step()
    print('epoch [{}/{}], loss:{:.4f}'
          .format(i + 1, epoch, loss.item()))
        
                
torch.save([encoder,decoder],'./model/deno_autoencoder3.pkl')
torch.save(latent.state_dict(),'./model/deno_latent_parameters2.pkl')
print(loss)


--------model restored--------

epoch [1/10], loss:0.0071
epoch [2/10], loss:0.0072
epoch [3/10], loss:0.0070
epoch [4/10], loss:0.0073
epoch [5/10], loss:0.0071
epoch [6/10], loss:0.0074
epoch [7/10], loss:0.0070
epoch [8/10], loss:0.0072
epoch [9/10], loss:0.0070
epoch [10/10], loss:0.0074
tensor(0.0074, device='cuda:0', grad_fn=<MseLossBackward>)


  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "


Use latent layer(as transfer learning) for CIFAR-10 Dataset Classifaction

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

latent2 = HiddenLayer().cuda()

latent2.load_state_dict(torch.load('./model/deno_latent_parameters2.pkl'))

latent2 = latent2.to(device)

latent2.fc1 = nn.Linear(512*32*32, 10).cuda()

#latent = latent.to(device)

# Observe that all parameters are being optimized
optimizer = optim.SGD(latent2.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)

criterion = nn.CrossEntropyLoss()

# Training
def train(epoch):
    print('\nEpoch: %d' % epoch)
    latent2.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = latent2(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()

def test(epoch):
    global best_acc
    latent2.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(test_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = latent2(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    # Save checkpoint.
    acc = 100.*correct/total
    print("accuracy = ", acc,"%"  )

count = 0
num_epochs = 10

for epoc in range(num_epochs):
    train(epoc)
    test(epoc)


Epoch: 0
accuracy =  52.78 %

Epoch: 1
accuracy =  60.0 %

Epoch: 2
accuracy =  60.8 %

Epoch: 3
accuracy =  59.88 %

Epoch: 4
accuracy =  60.71 %

Epoch: 5
accuracy =  60.44 %

Epoch: 6
accuracy =  61.91 %

Epoch: 7
accuracy =  59.55 %

Epoch: 8
accuracy =  61.16 %

Epoch: 9
accuracy =  61.38 %
