In [1]:
import cv2
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import glob
# For our model
import torchvision.models as models
import matplotlib.pyplot as plt
import numpy as np
# For utilities
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from torch.nn.functional import normalize
from sys import platform
from itertools import combinations
import torchvision.transforms as T
import time

In [12]:
def load(folder):
    files = glob.glob(folder)
    data =[]
    for f in files:
        image = cv2.imread(f)
        data.append(image)
    return data


def group(data, album_length):
    #group into chunks of three because of three sets of images in LAB color space
    for i in range (0, album_length, 3):
        yield image_data[i:i+3]


class SplitLab(object):
    """Splits tensor LAB image to L and ab channels."""
    def __call__(self, image):
        L  = image[:1,:,:]
        ab = image[1:,:,:]
        return (L, ab)
    

class ImageDataset(Dataset):
    """
       Custom dataset for LAB image preprocessing for colorization
    """
    def __init__(self, images):
       """
       images: array of lab images [idx, h, w, channels],
       where for channels 0 = L, 1 = a, 2 = b.
       """
       self.images = images
       self.composed = T.Compose([T.ToPILImage(),T.ToTensor(),SplitLab])

        
    def __len__(self):
        return len(self.images)
      
    
    def __getitem__(self, idx):
        """
        returns L, ab mean, and ab for visualization
        """
        image = self.images[idx, :, :, :]
        L, ab = self.composed(image)

        a_mean = torch.mean(ab[0, :, :])
        b_mean = torch.mean(ab[1, :, :])

        sample = {'L': L, 'ab': ab, 'a_mean': a_mean, 'b_mean': b_mean}
        
        return sample

In [13]:
home_dir = os.getcwd() 
#change this parameter depending on which album you want
target_album = 'LAB_TEST_FACES'
batch_size = 32

# load in data and split into test and train sets
image_data = np.asarray(load(home_dir + '/' + target_album + '/' + '*.jpg'))
train_images, test_images = train_test_split(image_data, test_size = 0.1, random_state=42)

# dataset and dataloader to prepare images for training
train_dataset = ImageDataset(train_images)
test_dataset = ImageDataset(test_images)

train_loader = DataLoader(dataset = train_dataset, batch_size = batch_size, shuffle=True)
test_loader = DataLoader(dataset = test_dataset, batch_size = batch_size, shuffle=True)

In [11]:
# select GPU / CPU
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
print('Device:', device)

Device: cuda:0


In [12]:
class ChrominanceReg(nn.Module):
    def __init__(self):
        super(ChrominanceReg, self).__init__()
        
        #128x128
        self.mod1 = nn.Sequential(
             nn.Conv2d(3, 6, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.AvgPool2d(kernel_size = (1,1), stride = 1)
             )
        #64x64
        self.mod2 = nn.Sequential(
             nn.Conv2d(6, 12, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.AvgPool2d(kernel_size = (1,1), stride = 1)
             )
        #32x32
        self.mod3 = nn.Sequential(
             nn.Conv2d(12, 24, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.AvgPool2d(kernel_size = (1,1), stride = 1)
             )
        #16x16
        self.mod4 = nn.Sequential(
             nn.Conv2d(24, 48, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.AvgPool2d(kernel_size = (1,1), stride = 1)
             )
        #8x8
        self.mod5 = nn.Sequential(
             nn.Conv2d(48, 96, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.AvgPool2d(kernel_size = (1,1), stride = 1)
             )
        #4x4
        self.mod6 = nn.Sequential(
             nn.Conv2d(96, 192, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.AvgPool2d(kernel_size = (1,1), stride = 1)
             )
        #2x2
        self.mod7 = nn.Sequential(
          
             nn.Conv2d(192, 384, kernel_size = 3, stride = 2, padding = 1),
             nn.ReLU(),
             nn.Flatten(),
             nn.Linear(1,2)
             )
        
    def forward(self, x):
     #    #Normalize input first
     #    make_Tensor = T.ToTensor()
     # #   transform_pil = T.ToPILImage()
       
     #    out = make_Tensor(x)
     #    #out = transform_pil(x)
     #    out = normalize(out)
        out = self.mod1(x)
        out = self.mod2(out)
        out = self.mod3(out)
        out = self.mod4(out)
        out = self.mod5(out)
        out = self.mod6(out)
        out = self.mod7(out)
        out = torch.mean(out,0, True)
        return out


regressor = ChrominanceReg().to(device)
print(regressor)

ChrominanceReg(
  (mod1): Sequential(
    (0): Conv2d(3, 6, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(1, 1), stride=1, padding=0)
  )
  (mod2): Sequential(
    (0): Conv2d(6, 12, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(1, 1), stride=1, padding=0)
  )
  (mod3): Sequential(
    (0): Conv2d(12, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(1, 1), stride=1, padding=0)
  )
  (mod4): Sequential(
    (0): Conv2d(24, 48, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(1, 1), stride=1, padding=0)
  )
  (mod5): Sequential(
    (0): Conv2d(48, 96, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (1): ReLU()
    (2): AvgPool2d(kernel_size=(1, 1), stride=1, padding=0)
  )
  (mod6): Sequential(
    (0): Conv2d(96, 192, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    

In [None]:
epochs = 10
lr = 0.01
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(regressor.parameters(), lr)

regressor.train()

for epoch in range(epochs):  # loop over the dataset multiple times

    running_loss = 0.0
    for data in train_loader:
        # get data
        L = data['L']
        a_mean = data['a_mean']
        b_mean = data['b_mean']

        # send to device
        L = L.to(device)
        a_mean = a_mean.to(device)
        b_mean = b_mean.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        outputs = regressor(L)
        loss = loss_fn(outputs[0], labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
    else:
        print("Epoch {} - Training loss: {}".format(epoch, running_loss/len(train_loader)))

    if epoch == 30 or epoch == 27 or epoch == 25:
        y_pred = []
        y_true = []

        with torch.no_grad():
            regressor.eval()
            correct, total = 0, 0
            for data in test_loader:
                outputs = regressor(data['L'].to(device))
                _, prediction = torch.max(outputs[0], 1)
                y_pred.extend(prediction.cpu()) # Save Prediction
                y_true.extend(data['label']) # Save Truth
                for i in range(len(data['label'])):
                    # print(prediction[i].cpu(), data['label'][i])
                    if(prediction[i]==data['label'][i]):
                        correct += 1
                    total += 1

        print("Number Of Images Tested =", total)
        print("\nModel Accuracy =", (correct/total))
print("\nTraining Finished. \nTraining Time (in minutes) =",(time()-time0)/60)

