### Importing libraries

In [1]:
import cv2 as cv
import os
import glob
import torch
import numpy as np
from torchvision import transforms
from shutil import copy2
from torchvision import datasets
from torch.utils.data import DataLoader, ConcatDataset
import torch.nn.functional as F
import torch.nn as nn
from torch import optim
from skimage.color import rgb2lab, rgb2gray, lab2rgb
import re
import matplotlib.image as mpimg
import matplotlib.pyplot as plt
from itertools import product

#### converting the default data type to float 32

In [2]:
torch.set_default_tensor_type('torch.FloatTensor')

### Task1 : Loading the data

In [3]:
img_dir = "C:/Users/aashish/Desktop/DLCG2/face_images/"
files = glob.glob("C:/Users/aashish/Desktop/DLCG2/face_images/*.jpg")


print("Length of given list:", len(files))
train_data = "C:/Users/aashish/Desktop/DLCG2/train/tensor/"
test_data = "C:/Users/aashish/Desktop/DLCG2/test/tensor/"

os.makedirs(train_data, exist_ok = True)
os.makedirs(test_data, exist_ok = True)

num_images = len(next(os.walk(img_dir))[2])
print("number of images:", num_images)

for i, file in enumerate(os.listdir(img_dir)):
    if i < (0.1*num_images):
        copy2(img_dir + file, test_data + file)
        continue
    else:
        copy2(img_dir + file, train_data + file)

print("Training Set Size:", len(next(os.walk(train_data))[2]))
print("Testing Set Size:", len(next(os.walk(test_data))[2]))


training_data = glob.glob('C:/Users/aashish/Desktop/DLCG2/train/tensor/*.jpg')
testing_data = glob.glob('C:/Users/aashish/Desktop/DLCG2/test/tensor/*.jpg')

print(len(training_data))
print(len(testing_data))
# # for f1 in files:
# #     img = cv.imread(f1)
# #     img = cv.cvtColor(img, cv.COLOR_BGR2RGB) #converting to RGB
# #     data.append(img)
# # data_tensor = torch.tensor(data).permute(0,3,1,2) #Converting nparray to tensor and in desired order
# # print (img.shape)
# print (data_tensor.shape)

Length of given list: 750
number of images: 750
Training Set Size: 675
Testing Set Size: 75
675
75


#### class to return augmented datasets

In [4]:
class AugmentedImageDataset(datasets.ImageFolder):
    def __getitem__(self,index):
        global channel_a, channel_b, img_gray
        path, target = self.imgs[index]
        img = self.loader(path)
        if self.transform is not None:
            img = self.transform(img)
            
        original_image = np.asarray(img)
        
        img_lab = rgb2lab(original_image)
        img_lab = img_lab + 128
        img_lab = img_lab / 255
        
        channel_a = img_lab[:, :, 1:2]
        channel_a = torch.from_numpy(channel_a.transpose((2,0,1))).float()
        
        channel_b = img_lab[:, :, 2:3]
        channel_b = torch.from_numpy(channel_b.transpose((2,0,1))).float()
        
        img_gray = rgb2gray(original_image)
        img_gray = torch.from_numpy(img_gray).unsqueeze(0).float()
        
        return channel_a, channel_b, img_gray
    
    
class AugmentedImageDataset_RELU(datasets.ImageFolder):
    def __getitem__(self,index):
        global channel_a, channel_b, img_gray
        path, target = self.imgs[index]
        img = self.loader(path)
        if self.transform is not None:
            img = self. transform(img)
        original_image = np.asarray(img)
        
        img_lab = rgb2lab(original_image)
        channel_a = img_lab[:, :, 1:2]
        channel_a = torch.from_numpy(channel_a.transpose((2,0,1))).float()
        
        channel_b = img_lab[:, :, 2:3]
        channel_b = torch.from_numpy(channel_b.transpose((2,0,1))).float()
        
        img_gray = rgb2gray(original_image)
        img_gray = torch.from_numpy(img_gray).unsqueeze(0).float()
        
        return channel_a, channel_b, img_gray
        
        

#### augmenting the dataset and loading into 10* tensor

In [5]:
transform = transforms.Compose([transforms.Resize(128),
                               transforms.RandomHorizontalFlip(),
                               transforms.RandomResizedCrop(128),
#                                 transforms.ToTensor(), 
#                                        transforms.Normalize([0.6, 0.6, 0.6], [0.6, 0.6, 0.6])
                               ])

train_dataset =[]
train_dataset.append(AugmentedImageDataset_RELU('C:/Users/aashish/Desktop/DLCG2/train'))
for i in range(9):
    train_dataset.append(AugmentedImageDataset_RELU('C:/Users/aashish/Desktop/DLCG2/train', transform))
    
augmented_dataset = ConcatDataset(train_dataset)
print(len(augmented_dataset))
print(len(train_dataset))

6750
10


#### loading all the data into tensors

In [6]:
train_args = dict(shuffle = True, batch_size = 32)
# if cuda:
#     train_args = dict(shuffle = True, batch_size = 8, num_workers = 1, pin_memory = True)
    
augmented_batch_train = DataLoader(dataset = augmented_dataset, **train_args)
augmented_batch_test = DataLoader(dataset = AugmentedImageDataset('C:/Users/aashish/Desktop/DLCG2/test')) 
#not applying transforms on test data

### Invoking GPU

In [7]:
device = torch.device("cuda:0" if torch.cuda.is_available()else  "cpu")
is_cuda_available = True if torch.cuda.is_available() else False
if is_cuda_available:
    num_workers = 8
else:
    num_workers = 0
# print(device)
print(format(device))

cuda:0


### building the Regressor

In [8]:
class Regressor(nn.Module):
    def __init__(self, in_channel=1, hidden_channel=3, out_dims=2, train_mode = "regressor"):
        super(Regressor,self).__init__()
        self.train_mode = train_mode
        
        self.features = nn.Sequential(
            nn.Conv2d(in_channels = in_channel, out_channels = 32, kernel_size=2, stride=2,padding=0),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            
            nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size=2, stride=2,padding=0),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            
            nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size=2, stride=2,padding=0),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            
            nn.Conv2d(in_channels =128, out_channels = 256, kernel_size=2, stride=2,padding=0),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            
            nn.Conv2d(in_channels = 256, out_channels = 256, kernel_size=2, stride=2,padding=0),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            
            nn.Conv2d(in_channels = 256, out_channels = 512, kernel_size=2, stride=2,padding=0),
            nn.BatchNorm2d(512),
            nn.ReLU(),
        )
        
        if self.train_mode == "regressor":
            self.lin = nn.Linear(in_features=512 * 2 * 2, out_features=out_dims)
            
    def forward(self, x):
        features = self.features(x)
        if self.train_mode == "regressor":
            y = torch.sigmoid(self.lin(features.reshape(-1,512*2*2)))
            return y
        else:
            return features

In [9]:
class Colorizer(nn.Module):
    def __init__(self, in_channel=3, hidden_channel=3, out_channel=2,activation_function = "sigmoid"):
        super(Colorizer, self).__init__()
        self.activation_function = activation_function
        self.features = Regressor(in_channel=1, hidden_channel = 3, out_dims=2, train_mode="colorizer")
        
        self.up_sampling = nn.Sequential(
            nn.ConvTranspose2d(in_channels=512, out_channels = 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            
            nn.ConvTranspose2d(in_channels=256, out_channels = 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            
            nn.ConvTranspose2d(in_channels=256, out_channels = 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            
            nn.ConvTranspose2d(in_channels=128, out_channels = 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            
            nn.ConvTranspose2d(in_channels=64, out_channels = 32, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            
            nn.ConvTranspose2d(in_channels=32, out_channels = out_channel, kernel_size=4, stride=2, padding=1)
            )
        
    def forward(self, x):
        if self.activation_function == "sigmoid":
            return torch.sigmoid(self.up_sampling(self.features(x)))
        elif self.activation_function == "tanh":
            return torch.tanh(self.up_sampling(self.features(x)))
        elif self.activation_function == "relu":
            return torch.relu(self.up_sampling(self.features(x)))

In [10]:
def get_ab_mean(a_channel, b_channel):
    a_channel_mean = a_channel.mean(dim = (2,3))
    b_channel_mean = b_channel.mean(dim=(2,3))
    a_b_mean = torch.cat([a_channel_mean,b_channel_mean], dim = 1)
    
    return a_b_mean

In [11]:
def to_rgb(gray_input, ab_input, save_path = None, save_name = None, device = "cpu"):
    plt.clf()
    color_image = torch.cat((gray_input, ab_input), 0).numpy()
    color_image = color_image.transpose((1,2,0))
    
    color_image[:, :, 0:1] = color_image[:, :, 0:1] * 100
    color_image[:, :, 1:3] = color_image[:, :, 1:3] * 255 - 128
    color_image = lab2rgb(color_image.astype(np.float64))
    gray_input = gray_input.squeeze().numpy()
    if save_path is not None and save_name is not None:
        plt.imsave(arr=gray_input, fname='{}{}'.format(save_path['grayscale'], save_name), cmap = 'gray')
        plt.imsave(arr=color_image, fname = '{}{}'.format(save_path['colorized'], save_name))

In [12]:
def display_image(gray, orig, new, fig_name):
    plt.clf()
    f = plt.figure()
    f.add_subplot(1, 3, 1)
    plt.imshow(mpimg.imread(gray))
    plt.axis('off')
    f.add_subplot(1, 3, 2)
    plt.imshow(mpimg.imread(orig))
    plt.axis('off')
    f.add_subplot(1, 3, 3)
    plt.imshow(mpimg.imread(new))
    plt.axis('off')
    
    plt.draw()
    plt.savefig(fig_name, dpi=220)
    plt.clf()
    plt.close()

In [13]:
class EarlyStopping_DCN:
    def __init__(self, patience = 7, verbose = False, delta = 0, model_path = None, trace_func = print):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.delta = delta
        self.model_path = model_path
        self.trace_func = trace_func

In [14]:
def Hyperparameters():
    parameters = dict(lr=[0.001],
                     weight_decay=[1e-5],
                     epoch=[100])
    hyperparam = [i for i in parameters.values()]
    return hyperparam

In [15]:
class Reg:
    def reg_train(self, augmented_dataset_batch, device):
        print("...Regressor training Started...")
        model = Regressor(in_channel = 1, hidden_channel = 3, out_dims = 2, train_mode = "regressor").to(device)

        lossF = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr = 0.0001, weight_decay = 1e-4)

        loss_train = []

        #start training
        epochs = 100
        for epoch in range(epochs):
            total_loss = 0
            model.train()

            for batch in augmented_dataset_batch:
                l_channel, a_channel, b_channel = batch
                l_channel = l_channel.to(device)

                a_b_mean = get_ab_mean(a_channel, b_channel)
                a_b_mean_hat = model(l_channel)

                if torch.cuda.is_available():
                    loss = lossF(a_b_mean_hat.float().cuda(),a_b_mean.float().cuda()).to(device)
                else:
                    loss = Lossf(a_b_mean_hat.float(), a_b_mean.float()).to(device)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                total_loss += loss.item()

            print("epoch: {0}, loss: {1}".format(epoch, total_loss))
            loss_train.append(total_loss)

        # plotting loss/ epoch graph
        plt.ion()
        fig = plt.figure()
        plt.plot(loss_train)
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.show()
        plt.draw()
        plt.savefig('C:/Users/aashish/Desktop/DLCG2/Graphs', dpi = 200)
        plt.clf()
        torch.save(model.state_dict(), "C:/Users/aashish/Desktop/DLCG2/Graphs/Reg.pth")
        
    def reg_test(self, augmented_dataset_batch, device):
        print("<<Regressor testing started>>")
        model = Regressor(in_channel= 1, hidden_channel=3, out_dims = 2, train_mode = "regressor").to(device)
        model.load_state_dict(torch.load("C:/Users/aashish/Desktop/DLCG2/Graphs/Reg.pth", map_location=device))
        
        a_list = []
        b_list = []
        lossF = nn.MSELoss()
        total_loss = 0
        loss_test = []
        for batch in augmented_dataset_batch:
            l_channel, a_channel, b_channel = batch
            l_channel = l_channel.to(device)
            
            a_b_mean = get_ab_mean(a_channel, b_channel)
            a_b_mean_hat = model(l_channel).detach()
            
            if torch.cuda.is_available():
                loss = lossF(a_b_mean_hat.float().cuda(), a_b_mean.float().cuda()).to(device)
            else:
                loss = lossF(a_b_mean_hat.float(), a_b_mean.float()).to(device)
            loss_test.append(loss.item())
            
            a_b_pred = a_b_mean_hat[0].cpu().numpy()
            a_list.append(a_b_pred[0])
            b_list.append(a_b_pred[1])
        print("MSE:", np.average(np.asarray(loss_test)))
        print("Num_Image || Mean a || Mean b")
        for i in range(1, len(a_list)):
            print("Image:{0} mean_a: {1} mean_b:{2}".format(i, (a_list[i] * 255)-128, (b_list[i]*255)-128))
            
    def train_colorizer(self, augmented_dataset_batch, activation_function, model_name, device):
        print("...Activation Function...", activation_function)
        print("...colorizer training started...")
        
        
        parameters = Hyperparameters()
        for lr, weight_decay, epoch in product(*parameters):
            print("Epoch: {0}, lr: {1}, weight decay:{2}".format(epoch, lr, weight_decay))
            model = Colorizer(in_channel=3, hidden_channel=3, activation_function = activation_function).to(device)
        
            lossF = nn.MSELoss()
            optimizer = optim.Adam(model.parameters(), lr =lr, weight_decay = weight_decay)
#             saved_model_path = model_name.format(epoch, lr, weight_decay)
            loss_train = []
            early_stopping = EarlyStopping_DCN(patience = 50, verbose=True, model_path="C:/Users/aashish/Desktop/DLCG2/Graphs/color.pth")
#             epochs = 100
            for epoch in range(epoch):
                total_train_loss = 0
                total_val_loss = 0
                model.train()
                
                for batch in augmented_dataset_batch:
                    channel_l, channel_a, channel_b = batch
                    channel_l = channel_l.to(device)
                    
                    channel_a_b = torch.cat([channel_a, channel_b], dim = 1)
                    channel_a_b_hat = model(channel_l)
                    
                    if torch.cuda.is_available():
                        loss = lossF(channel_a_b_hat.float().cuda(), channel_a_b.float().cuda()).to(device)
                    else:
                        loss = lossF(channel_a_b_hat.float(), channel_a_b.float()).to(device)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    
                    total_train_loss += loss.item()
                    
                print("epoch:{0}, loss:{1}".format(epoch, total_train_loss))
                loss_train.append(total_train_loss)
                
                
                if early_stopping.early_stop:
                    print("Early Stop")
                    break
                    
                # plotting loss/ epoch graph
            plt.ion()
            fig = plt.figure()
            plt.plot(loss_train)
            plt.xlabel('Epochs')
            plt.ylabel('Loss')
            plt.show()
            plt.draw()
            plt.savefig('C:/Users/aashish/Desktop/DLCG2/Graphs/loss_plot_path.jpeg', dpi = 220)
            plt.clf()
            torch.save(model.state_dict(), "C:/Users/aashish/Desktop/DLCG2/Graphs/color.pth")
        
        
    def test_colorizer(self, augmented_dataset_batch, activation_function, save_path, model_name, device):
        parameters = Hyperparameters()
        for lr, weight_decay, epoch in product(*parameters):
#             print("------")
            print("Epoch : {0}, lr: {1}, Weight_decay: {2}".format(epoch, lr, weight_decay))
            
#             saved_model_path = model_name.format(epoch, lr, weight_decay)
                
            print(activation_function)
            print("--- Colorizer Testing Started ---")
            model = Colorizer(in_channel=3, hidden_channel = 3, activation_function = activation_function).to(device)
            model.load_state_dict(torch.load("C:/Users/aashish/Desktop/DLCG2/Graphs/color.pth", map_location = device))
                
            lossF = nn.MSELoss()
            num = 0
            for batch in augmented_dataset_batch:
                num += 1
                channel_l, channel_a, channel_b = batch
                channel_l = channel_l.to(device)
                    
                channel_a_b = torch.cat([channel_a, channel_b], dim=1)
                channel_a_b_hat = model(channel_l).detach()
                
                if torch.cuda.is_available():
                    loss = lossF(channel_a_b_hat.float().cuda(), channel_a_b.float().cuda()).to(device)
                else:
                    loss = lossF(channel_a_b_hat.float(), channel_a_b.float()).to(device)
                    
                print("Image: {0}, loss: {1}".format(num, loss.item()))
                
                
                save_original = 'Orig_img_epoch_{0}_lr_{1}_wt_decay_{2}_num_{3}.jpg' \
                    .format(epoch, lr, weight_decay, num)
                save_new = 'new_img_epoch_{0}_lr_{1}_wt_decay_{2}_num_{3}.jpg' \
                    .format(epoch, lr, weight_decay, num)
                
                to_rgb(channel_l[0].cpu(), channel_a_b[0].cpu(),
                    save_path = save_path, save_name = save_original, device = device)
                to_rgb(channel_l[0].cpu(), channel_a_b[0].cpu(),
                    save_path = save_path, save_name = save_new, device = device)
                
        self.display_imgs(epoch, lr, weight_decay, save_path)
        
        
    @staticmethod
    def display_imgs(epoch, lr, weight_decay, save_path):
        color_path = save_path['colorized']
        gray_path = save_path['grayscale']
            
            
        for i in range(7,70,7):
            title = "".\
                format(epoch, lr, weight_decay, i)
            save_original = 'orig_img_epoch_{0}_lr_{1}_wt_decay_{2}_num_{3}.jpg' \
                   .format(epoch, lr, weight_decay, i)
            save_new = 'new_img_epoch_{0}_lr_{1}_wt_decay_{2}_num_{3}.jpg' \
                    .format(epoch, lr, weight_decay, i)
            display_image(gray_path + save_original, color_path + save_original, 
                              color_path + save_new, title)

In [16]:
Myregressor = Reg()
# Myregressor.reg_train(augmented_batch_train,device)

In [17]:
# Myregressor.reg_test(augmented_batch_train,device)

In [18]:
model_name = "C:/Users/aashish/Desktop/DLCG2/Graphs/Colorizer/Colorizer_sigmoid_epoch{0}_lr_{1}_weight_decay_{2}.pth"
Myregressor.train_colorizer(augmented_batch_train, "relu", model_name, device)

...Activation Function... relu
...colorizer training started...
Epoch: 100, lr: 0.001, weight decay:1e-05


NameError: name 'img_gray' is not defined

In [None]:
save_path = {'grayscale': 'C:/Users/aashish/Desktop/DLCG2/Graphs/Colorizer/outputs/gray1/', 'colorized': 'C:/Users/aashish/Desktop/DLCG2/Graphs/Colorizer/outputs/color1/'}

Myregressor.test_colorizer(augmented_batch_test, "relu", save_path, model_name, device)

In [None]:
# def scale_rgb(image):
#     scale = random.uniform(0.6,1)
#     scaled_rgb_image = image*scale
#     return scaled_rgb_image

# def flip():a
#     return random.choice([True,False])

# def augment_dataset(images, n):
#     num_images = images.shape[0]
#     augmented_data = torch.empty((n*num_images)+num_images, 3, 128, 128)
    
#     #random cropping
#     crop = transforms.Compose([transforms.RandomResizedCrop(128,128)])
#     horizontal_flip = transforms.Compose([transforms.RandomHorizontalFlip(p=.7)])
    
#     num = 0
    
#     #original training set
#     for i in images:
#         augmented_data[num] = torch.Tensor(np.array(image).astype(np.uint8))
#         num+=1
        
#     for i in range(n):
#         for image in images:
#             if flip:
#                 transformed_image = crop(image)
#             transformed_image = horizontal_flip(image)
#             transformed_image = scale_rgb(transformed_image)
#             augmented_data[num] = torch.Tensor(np.array(transformed_image).astype(np.uint8))
#             num+=1
#     return augmented_data

### Task 3 : convert to LAB color space

In [None]:
# def convert_to_LAB(images):
#     num_images = images.shape[0]
#     LAB_data = torch.empty(num_images,3,128,128)
    
#     images = images.permute(num_images, 3, 128, 128)
#     for image in enumerate(images):
#         image = np.array(image).astype(np.uint8)
#         imageLAB = cv2.cvtColor(image, COLOR_RGB2LAB)

In [None]:
# import torch.nn as nn

# class model():
#     def __init__(self):
#         super().__init__()
#         self.layer1 = nn.Sequential(nn.Conv2d(1, 3, kernel_size=2, stride=2, padding=0), nn.ReLu())
#         self.layer2 = nn.Sequential(nn.Conv2d(3, 3, kernel_size=2, stride=2, padding=0), nn.ReLu())
#         self.layer3 = nn.Sequential(nn.Conv2d(3, 3, kernel_size=2, stride=2, padding=0), nn.ReLu())
#         self.layer4 = nn.Sequential(nn.Conv2d(3, 3, kernel_size=2, stride=2, padding=0), nn.ReLu())
#         self.layer5 = nn.Sequential(nn.Conv2d(3, 3, kernel_size=2, stride=2, padding=0), nn.ReLu())
#         self.layer6 = nn.Sequential(nn.Conv2d(3, 3, kernel_size=2, stride=2, padding=0), nn.ReLu())
#         self.layer7 = nn.Linear(2*2*3,2)
#     def forward(self, X):
#         X = self.layer1(X)
#         X = self.layer2(X)
#         X = self.layer3(X)
#         X = self.layer4(X)
#         X = self.layer5(X)
#         X = self.layer6(X)
#         tensor.reshape(X, (12,1))
#         X = self.layer7(X)
#         return X

In [None]:
# LABdata = []
# for i in data:
#     imageLAB = cv.cvtColor(i, cv.COLOR_BGR2LAB)
#     LABdata.append(i)