# Segregate Train and test data from face_images

In [1]:
import cv2
import os
import glob
from random import shuffle
import shutil
import math
test_dir = "blue_cis6930/nghosh/Test"
train_dir = "blue_cis6930/nghosh/Train"
input_dir = "face_images"
if not os.path.isdir("blue_cis6930"):
    os.mkdir("blue_cis6930")
    os.mkdir("blue_cis6930/nghosh")

if not os.path.isdir(test_dir) and not os.path.isdir(train_dir):
    os.mkdir(train_dir)
    os.mkdir(test_dir)
inside_test = os.listdir(test_dir)
inside_train = os.listdir(train_dir)
if len(inside_test) == 0 or len(inside_train) == 0:
    files = [os.path.join(input_dir,fle) for fle in os.listdir(input_dir)]
    dst_dir = train_dir
    file_count = sum([len(files) for r, d, files in os.walk(input_dir)])
    train_num = math.ceil(0.9*float(file_count))
    test_num = file_count - train_num
    num_copied = 0
    shuffle(files)
    for fle in files:
      if os.path.isfile(fle):
          with open(fle) as f:
            if num_copied == train_num:
              dst_dir = test_dir
            shutil.copy(fle, dst_dir)
            num_copied += 1
    print("Data Segregated successfully!!")

# Data Loading & augmentation

In [2]:
import torch
import torchvision
from PIL import Image, ImageMath
import glob
import cv2
from torch.utils.data.dataset import Dataset
import random
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt

%matplotlib inline

KeyboardInterrupt: 

In [None]:
# Define device for Cuda
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
# Custom class RandomScale to randomly scale the values of RGB image between 0.6, 1
# Used for data augmentation

class RandomScale(object):
    def __call__(self, img):
        rand = torch.tensor(random.uniform(0.6,1.0), dtype=torch.float32)
        img = (torchvision.transforms.ToTensor()(img)).float()
        img = img*rand
        return img
    def __repr__(self):
        return self.__class__.__name__ + '()'

In [None]:
# Utility function to convert RGB to lab image
# Post conversion, returns L and mean values for a & b channels
def convertRGBToLAB(img):
  lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB).astype("float32")
  l,a,b = cv2.split(lab)
  return (l, np.mean(a), np.mean(b))

In [None]:
# Custom Dataset that applies random scale, random horizontal flip and
# random crop to each train image 10 times
# Resize each augmented image to 128x128

class ColorDataset(Dataset):
  def __init__(self, folder_path, transform=None):
    self.image_list = glob.glob(folder_path)
    self.transform = transform
    self.T = []
    for img in self.image_list:
      single_img = Image.open(img)
      
      if self.transform !=None:
        for i in range(10):
          tran = self.transform(single_img)
          l,a,b = convertRGBToLAB(np.asarray(tran.permute(1,2,0)))
          l = l/100
          self.T.append((l,a,b))
    self.data_len = len(self.T)
       
  def __getitem__(self, index):
    return self.T[index]

  def __len__(self):
    return self.data_len


In [None]:
# Load the train dataset after creating 10 augmented & resized image per input train image

transforms = torchvision.transforms.Compose([
    torchvision.transforms.RandomHorizontalFlip(),
    torchvision.transforms.RandomResizedCrop(128),
    RandomScale(),
    torchvision.transforms.ToPILImage(),
    torchvision.transforms.ToTensor()
])
train_dir = "blue_cis6930/nghosh/Train/*"
dataset = ColorDataset(train_dir, transforms)
train_loader = torch.utils.data.DataLoader(dataset=dataset, shuffle=True, batch_size=100)



# Network definition & Training

In [None]:
# Network Defination for the Simple regressor

class RegNetwork(nn.Module):
  def __init__(self):
    super(RegNetwork, self).__init__()    
    self.layers = nn.Sequential(
        nn.Conv2d(1,3,kernel_size=3,stride=2,padding=1),
        nn.LeakyReLU(),
        nn.Conv2d(3,3,kernel_size=3,stride=2,padding=1),
        nn.LeakyReLU(),
        nn.Conv2d(3,3,kernel_size=3,stride=2,padding=1),
        nn.LeakyReLU(),
        nn.Conv2d(3,3,kernel_size=3,stride=2,padding=1),
        nn.LeakyReLU(),
        nn.Conv2d(3,3,kernel_size=3,stride=2,padding=1),
        nn.LeakyReLU(),
        nn.Conv2d(3,3,kernel_size=3,stride=2,padding=1),
        nn.LeakyReLU(),
        nn.Conv2d(3,2,kernel_size=3,stride=2,padding=1)
    )
  def forward(self, inputVal):
      inputVal = self.layers(inputVal)       
      return inputVal

In [None]:
# Initialize the model as the simple regressor network
model = RegNetwork().to(device)
print(model)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Function to train the network
def train(grayImage, targetVals, criterion, optimizer, model):
  predValues = model(grayImage)
  loss_train = criterion(predValues, targetVals)
  optimizer.zero_grad()
  loss_train.backward()
  optimizer.step()
  return loss_train.item()


In [None]:
# Train the network for 200 epochs 
# Display the graph for training loss error
loss_list = []
for epoch in range(200):
  totalLoss = 0
  avgLoss = 0
  for gImage,a,b in train_loader:
    gImage = torch.unsqueeze(gImage, 1)
    targetVals = torch.empty(a.shape[0], 2, 1, 1)
    targetVals[:,0, :, :] = torch.unsqueeze(torch.unsqueeze(a, -1), -2)
    targetVals[:,1, :, :] = torch.unsqueeze(torch.unsqueeze(b, -1), -2)
    if torch.cuda.is_available():
      gImage = gImage.to(device)
      targetVals = targetVals.to(device)
    totalLoss = totalLoss + train(gImage,targetVals, criterion, optimizer,model)
  avgLoss = totalLoss / len(train_loader)
  loss_list.append(avgLoss)
  print("Loss in epoch ", epoch, " is ", avgLoss)
plt.plot(np.array(loss_list), 'r')
  
  