In [115]:
import torch
import torchvision
import torchvision.transforms as transforms
import os

import torchvision as tv
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torchvision.utils import save_image
from torch.utils.data import random_split

### Import dataset and put into dataloader

In [116]:
# import CIFAR-10 dataset into trainset, testset
# load in to trainloader, testloader

# color dataset
transform_color = transforms.Compose([
        transforms.ToTensor(),
         transforms.Normalize((0.5), (0.5))
    ])

dataset_color = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=False, transform=transform_color)

batchSize = 64

val_size = 5000

train_size = len(dataset_color) - val_size

trainset_color, validset_color = random_split(dataset_color, [train_size, val_size])

trainloader_color = torch.utils.data.DataLoader(trainset_color, batch_size=batchSize,
                                          shuffle=True, num_workers=4)
validloader_color = torch.utils.data.DataLoader(validset_color, batch_size=batchSize,
                                          shuffle=True, num_workers=4)

testset_color = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=False, transform=transform_color)
testloader_color = torch.utils.data.DataLoader(testset_color, batch_size=batchSize,
                                         shuffle=False, num_workers=4)
# print the size of train, valid, test datasets
print("training dataset: ", len(trainset_color))
print("validation dataset: ", len(validset_color))
print("testing dataset: ", len(testset_color))


# Number of GPUs available. Use 0 for CPU mode.
ngpu = 1
# Decide which device we want to run on
device = torch.device("cuda:0" if (torch.cuda.is_available() and ngpu > 0) else "cpu")
print(device)

training dataset:  45000
validation dataset:  5000
testing dataset:  10000
cuda:0


### Pre-processing: Convert all rgb images to gray images

In [117]:
import matplotlib.pyplot as plt
import numpy as np
import cv2

# functions to show an image
def imshow(img, name=""):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.savefig('plot/AE_Conv_500/'+name+"batch_"+str(batchSize)+'.png', dpi=1000)
    plt.show()

if not os.path.exists('./plot/AE_Conv_500'):
    os.makedirs('./plot/AE_Conv_500')
# get some random training images
dataiter = iter(trainloader_color)
images_color, labels = dataiter.next()

In [None]:
# function to convert a rgb_img to gray_img
def rgb2gray(img):
    rgb_img = img.transpose(1,2,0)
    gray_img = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY)
    return gray_img

# function to convert a batch of rgb images to gray images
def rgb2gray_batch(images):
    rgb_images = np.array(images)    
    
    gray_images = []
    for i in range(rgb_images.shape[0]):
        img = rgb_images[i]
        gray_img = rgb2gray(img)
        gray_images.append(gray_img)
    
    gray_images = np.array(gray_images)
    gray_images = torch.from_numpy(gray_images)
    return gray_images
        
images_gray = rgb2gray_batch(images_color)

# show images
images_gray = images_gray.reshape(batchSize,1,32,32)

imshow(torchvision.utils.make_grid(images_color), name="trainData_color_")
imshow(torchvision.utils.make_grid(images_gray), name="trainData_gray_")
print("images_gray shape: ", images_gray.shape)

### Define the Auto-Encoder network
The shape of a convolutional layer depends on the supplied values of kernel_size, input_shape, padding, and stride. Let's define a few variables:

K - the number of filters in the convolutional layer\
F - the height and width of the convolutional filters\
S - the stride of the convolution\
P - the padding\
W_in - the width/height (square) of the previous layer

Notice that K = out_channels, F = kernel_size, and S = stride. Likewise, W_in is the first and second value of the input_shape tuple.

The depth of the convolutional layer will always equal the number of filters K.

The spatial dimensions of a convolutional layer can be calculated as: (W_in−F+2P)/S+1

In [None]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder,self).__init__()

        self.encoder = nn.Sequential(
            nn.Conv2d(1,4, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(4,8,kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(8,16,kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(16,32,kernel_size=3, padding=1),
            nn.ReLU()
        )
        self.decoder = nn.Sequential(             
            nn.ConvTranspose2d(32,16,kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(16,8,kernel_size=3, padding=1),
            nn.ReLU(),
            nn.ConvTranspose2d(8,3,kernel_size=3, padding=1),
            nn.Tanh()
        )
    def forward(self,x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

### Define the loss function and optimizer

In [None]:
# define loss function and optimizer
model = Autoencoder().to(device)
print("GPU: ", next(model.parameters()).is_cuda)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),weight_decay=1e-5)


model_path = "./saved_model/AE_Conv_Train_"+str(batchSize)+ ".pt"
if os.path.exists(model_path):
    print("Continue training from the saved model")
    model.load_state_dict(torch.load(model_path))

### Tensorboard to save all training output logs

In [None]:
from torch.utils.tensorboard import SummaryWriter

# Writer will output to ./runs/ directory by default
writer = SummaryWriter("./runs/AE_Conv_Train_"+str(batchSize)+"/")

### Train the network

In [None]:
for epoch in range(25):  # loop over the dataset multiple times
    train_running_loss = 0.0
    valid_running_loss = 0.0
    
    ###################
    # train the model #
    ###################
    model.train()
    for i, data_color in enumerate(trainloader_color, 0):
        # get the inputs; data is a list of [inputs, labels]
        images_color, labels_color = data_color
        images_gray = rgb2gray_batch(images_color)
        
        # put data in gpu/cpu
        images_color = images_color.to(device)
        images_gray = images_gray.to(device)
        
        # zero the parameter gradients
        optimizer.zero_grad()

        # Forward
        # Size - images_gray: [batchSize,1,32,32]
        images_gray = images_gray.reshape(images_color.shape[0],1,32,32)
        outputs = model(images_gray)
    
        # Backward
        # Size - images_color: [batchSize,3,32,32]
        # Size - outputs: [batchSize,3,32,32]
        loss = criterion(outputs, images_color)
        loss.backward()
        optimizer.step()

        # print output statistics
        train_running_loss += loss.item()
        
        j = 400              # print every j mini-batches
        if i % j == (j-1):     # print every j mini-batches
            print('[%d, %5d] train loss: %.3f' %
                  (epoch + 1, i + 1, train_running_loss / j))
            
            writer.add_scalar('Loss/train', train_running_loss/j, len(trainloader_color)*epoch + i)
            train_running_loss = 0.0
        
    ###################
    # validate the model #
    ###################
    model.eval()
    with torch.no_grad():
        for i, data_color in enumerate(validloader_color, 0):
            # get the inputs; data is a list of [inputs, labels]
            images_color, labels_color = data_color
            images_gray = rgb2gray_batch(images_color)

            # put data in gpu/cpu
            images_color = images_color.to(device)
            images_gray = images_gray.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # Forward
            # Size - images_gray: [batchSize,1,32,32]
            images_gray = images_gray.reshape(images_color.shape[0],1,32,32)
            outputs = model(images_gray)

            # Backward
            # Size - images_color: [batchSize,3,32,32]
            # Size - outputs: [batchSize,3,32,32]
            valid_loss = criterion(outputs, images_color)

            # print output statistics
            valid_running_loss += valid_loss.item()
    
        print('[%d, %5d] validation loss: %.3f' %
              (epoch + 1, i + 1, valid_running_loss/(i+1)))

        writer.add_scalar('Loss/valid', valid_running_loss/(i+1), len(trainloader_color)*epoch + i)
        valid_running_loss = 0.0
    

writer.flush()
writer.close()
print('Finished Training')

### Inference on train dataset

In [None]:
# get some random training images
dataiter = iter(validloader_color)
images_color, labels_color = dataiter.next()
print("images_color: ", images_color.shape)
# show images
images_color_show = images_color.reshape(batchSize,3,32,32)
imshow(torchvision.utils.make_grid(images_color_show.detach()))

images_gray = rgb2gray_batch(images_color)
print("images_gray: ", images_gray.shape)
# show images
images_gray = images_gray.reshape(batchSize,1,32,32)
imshow(torchvision.utils.make_grid(images_gray.detach()))

# run inference on the network
# oututs [4,3072]
images_gray = images_gray.to(device)
outputs = model(images_gray)
print("outputs: ", outputs.shape)

images_color = outputs.reshape(batchSize,3,32,32)
images_color = images_color.to("cpu")
imshow(torchvision.utils.make_grid(images_color.detach()))

images_gray = images_gray.to("cpu")
images_gray = torch.cat((images_gray, images_gray, images_gray), 1)
final_result_display = torch.cat((images_color_show, images_gray, images_color), 0)
imshow(torchvision.utils.make_grid(final_result_display.detach()), name="result_onTrainData_")

### Save the model

In [None]:
if not os.path.exists('./saved_model'):
    os.makedirs('./saved_model')

torch.save(model.state_dict(), model_path)

### Inference on test dataset

In [None]:
# get some random training images
dataiter = iter(testloader_color)
images_color, labels_color = dataiter.next()
print("images_color: ", images_color.shape)
# show images
images_color_show = images_color.reshape(batchSize,3,32,32)
imshow(torchvision.utils.make_grid(images_color_show.detach()))

images_gray = rgb2gray_batch(images_color)
print("images_gray: ", images_gray.shape)
# show images
images_gray = images_gray.reshape(batchSize,1,32,32)
imshow(torchvision.utils.make_grid(images_gray.detach()))

# run inference on the network
images_gray = images_gray.to(device)
outputs = model(images_gray)
outputs_color = outputs.reshape(batchSize,3,32,32)
print(outputs_color.shape)

outputs_color = outputs.to("cpu")
imshow(torchvision.utils.make_grid(outputs_color.detach()))

images_gray = images_gray.to("cpu")
images_gray = torch.cat((images_gray, images_gray, images_gray), 1)

final_result_display = torch.cat((images_color_show, images_gray, outputs_color), 0)
imshow(torchvision.utils.make_grid(final_result_display.detach()),name="result_onTestData_")

### Test Error

In [None]:
###################
# test the model #
###################
model.eval()
with torch.no_grad():
    test_running_loss = 0.0
    for i, data_color in enumerate(testloader_color, 0):
        # get the inputs; data is a list of [inputs, labels]
        images_color, labels_color = data_color
        images_gray = rgb2gray_batch(images_color)

        # put data in gpu/cpu
        images_color = images_color.to(device)
        images_gray = images_gray.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # Forward
        # Size - images_gray: [batchSize,1,32,32]
        images_gray = images_gray.reshape(images_color.shape[0],1,32,32)
        outputs = model(images_gray)

        # Backward
        # Size - images_color: [batchSize,3,32,32]
        # Size - outputs: [batchSize,3,32,32]
        test_loss = criterion(outputs, images_color)

        # print output statistics
        test_running_loss += test_loss.item()

    print('test loss: %.3f' % (test_running_loss/(i+1)) )