In [None]:
import os
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms, models
from PIL import Image
from torchbearer.cv_utils import DatasetValidationSplitter
from livelossplot import PlotLosses

from torch import nn
from tqdm import tqdm

In [None]:
import cv2
import numpy as np 

def padding(img, shape_r=480, shape_c=640, channels=3):
    img_padded = np.zeros((shape_r, shape_c, channels), dtype=np.uint8)
    if channels == 1:
        img_padded = np.zeros((shape_r, shape_c), dtype=np.uint8)

    original_shape = img.shape
    rows_rate = original_shape[0]/shape_r
    cols_rate = original_shape[1]/shape_c

    if rows_rate > cols_rate:
        new_cols = (original_shape[1] * shape_r) // original_shape[0]
        img = cv2.resize(img, (new_cols, shape_r))
        if new_cols > shape_c:
            new_cols = shape_c
        img_padded[:, ((img_padded.shape[1] - new_cols) // 2):((img_padded.shape[1] - new_cols) // 2 + new_cols)] = img
    else:
        new_rows = (original_shape[0] * shape_c) // original_shape[1]
        img = cv2.resize(img, (shape_c, new_rows))
        if new_rows > shape_r:
            new_rows = shape_r
        img_padded[((img_padded.shape[0] - new_rows) // 2):((img_padded.shape[0] - new_rows) // 2 + new_rows), :] = img

    return img_padded

In [None]:
# class CustomImageDataset(Dataset):
#     def __init__(self, imgs_path, fix_maps_path=None, transform=None, target_transform=None):
#         self.images = [os.path.join(imgs_path, category,img) for category in os.listdir(imgs_path)
#                                  for img in os.listdir(os.path.join(imgs_path, category)) if img.endswith('.jpg')]
#         self.maps = [os.path.join(fix_maps_path, category,img) for category in os.listdir(fix_maps_path)
#                                  for img in os.listdir(os.path.join(fix_maps_path, category)) if img.endswith('.jpg')] if fix_maps_path else None
#         self.transform = transform
#         self.target_transform = target_transform
#         self.norm = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

#     def __len__(self):
#         return len(self.images)

#     def __getitem__(self, idx):
#         image = cv2.imread(self.images[idx])
#         image = padding(image, image_size1, image_size2, 3).astype('float')
#         image = np.rollaxis(image, 2, 0)  
#         if self.maps:
#             fix_map = cv2.imread(self.maps[idx],0)
#             fix_map = padding(fix_map, shape_r_gt, shape_c_gt, 1).astype('float')
#         if self.transform:
# #             print(image.shape)
#             image = torch.tensor(image,dtype=torch.float)
#             if image.shape[0] == 1:
#                 image = image.expand(3,image_size1,image_size2)
#             image = self.norm(image)
#             if self.maps:
#                 fix_map = torch.tensor(fix_map,dtype=torch.float)
#                 fix_map = fix_map.repeat(1,8,8)
        
# #         print(image.shape, fix_map.shape)
#         catt = torch.cat([image, fix_map], 0)
#         return catt

In [None]:
class CustomImageDataset(Dataset):
    def __init__(self, imgs_path, fix_maps_path=None, transform=None, target_transform=None):
        self.images = [os.path.join(imgs_path,img) for img in os.listdir(os.path.join(imgs_path)) if img.endswith('.jpg')]
        self.maps = [os.path.join(fix_maps_path,img) for img in os.listdir(os.path.join(fix_maps_path)) if img.endswith('.png')] if fix_maps_path else None
        self.transform = transform
        self.target_transform = target_transform
        self.norm = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx])
        if self.maps:
            fix_map = Image.open(self.maps[idx])
        if self.transform:
            image = self.transform(image)
            if image.shape[0] == 1:
                image = image.expand(3,480,640)
            image = self.norm(image)
            if self.maps:
                fix_map = self.target_transform(fix_map)
#                 fix_map = fix_map.repeat(1,8,8)
        print(image.shape, fix_map.shape)
        catt = torch.cat([image, fix_map], 0)
        return catt

In [None]:
# class CustomImageDataset(Dataset):
#     def __init__(self, imgs_path, fix_maps_path=None, transform=None, target_transform=None):
#         self.images = [os.path.join(imgs_path, category,img) for category in os.listdir(imgs_path)
#                                  for img in os.listdir(os.path.join(imgs_path, category)) if img.endswith('.jpg')]
#         self.maps = [os.path.join(fix_maps_path, category,img) for category in os.listdir(fix_maps_path)
#                                  for img in os.listdir(os.path.join(fix_maps_path, category)) if img.endswith('.jpg')] if fix_maps_path else None
#         self.transform = transform
#         self.target_transform = target_transform
#         self.norm = Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

#     def __len__(self):
#         return len(self.images)

#     def __getitem__(self, idx):
#         image = Image.open(self.images[idx])
#         if self.maps:
#             fix_map = Image.open(self.maps[idx])
#         if self.transform:
#             image = self.transform(image)
#             if image.shape[0] == 1:
#                 image = image.expand(3,480,640)
#             image = self.norm(image)
#             if self.maps:
#                 fix_map = self.target_transform(fix_map)
# #                 fix_map = fix_map.repeat(1,8,8)
        
# #         print(image.shape, fix_map.shape)
#         catt = torch.cat([image, fix_map], 0)
#         return catt / 255.0

In [None]:
from torchvision.transforms import ToTensor, Resize, Normalize
image_size1 = 480
image_size2 = 640
shape_r_gt = image_size1 // 8
shape_c_gt = image_size2 // 8
prior_size = ( int(shape_r_gt / 10) , int(shape_c_gt / 10) )

transform1 = transforms.Compose([Resize((shape_r_gt, shape_c_gt)), ToTensor(), Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
transform2 = transforms.Compose([Resize((shape_r_gt, shape_c_gt)), ToTensor()])
transform3 = transforms.Compose([Resize((shape_r_gt, shape_c_gt))])

root = 'MLNet-Pytorch/'

class Cat200Loader:
    def __init__(self, root_path, batch_size=4, frac_train_to_be_val=0.2):
        self.datasets = {}
        self.loaders = {}
        imgs_path = lambda x: f'{root_path}/{x}/'
        maps_path = lambda x: f'{root_path}/{x}/'
        
        self.datasets['test'] = CustomImageDataset(imgs_path('test_images'), transform=transform1)
        self.datasets['train'] = CustomImageDataset(imgs_path('images'), maps_path('train'), transform=transform1, target_transform=transform2)
        self.datasets['val'] = CustomImageDataset(imgs_path('val_images'), maps_path('val'), transform=transform1, target_transform=transform2)
        
        
        self.loaders['train'] = DataLoader(self.datasets['train'], batch_size=batch_size, shuffle = True, pin_memory=True)
        self.loaders['val'] = DataLoader(self.datasets['val'], batch_size=batch_size, shuffle = True, pin_memory=True)
        self.loaders['test'] = DataLoader(self.datasets['test'], batch_size=batch_size, shuffle = False, pin_memory=True)

In [None]:
import matplotlib.pyplot as plt

In [None]:
loaders = Cat200Loader(root)

In [None]:
prior_size

loaders = Cat200Loader(root)

for el in loaders.loaders['train']:
#     print(el[:,:-1,:,:].shape, el[:,-1,:,:].unsqueeze(1).shape)
    gt = el[:,-1,:120,:160].unsqueeze(1)
    break
    for y in gt:
        plt.imshow(y[0].data.cpu().numpy(),cmap='gray')
        plt.show()
        print ("Original")
    break

In [None]:
# Modified MSE Loss Function
class ModMSELoss(torch.nn.Module):
    def __init__(self,shape_r_gt,shape_c_gt):
        super(ModMSELoss, self).__init__()
        self.shape_r_gt = shape_r_gt
        self.shape_c_gt = shape_c_gt
        
    def forward(self, output , label , prior):
        prior_size = prior.shape
        
        output_max1 = torch.max(torch.max(output,2)[0],2)[0].unsqueeze(2).unsqueeze(2).expand(output.shape[0],output.shape[1],self.shape_r_gt,self.shape_c_gt)
        output_max = output_max1.clone()
        mask = torch.tensor([subt.sum() for subt in output_max]).cuda()
        output_max[mask==0] += torch.tensor(0.001)
        reg = ( 1.0/(prior_size[0]*prior_size[1]) ) * ( 1 - prior)**2
        loss = torch.mean( ((output / output_max) - label)**2 / (1.1 - label) )  +  torch.sum(reg)
        return loss

In [None]:
# Modified MSE Loss Function
class ModMSELoss(torch.nn.Module):
    def __init__(self,shape_r_gt,shape_c_gt):
        super(ModMSELoss, self).__init__()
        self.shape_r_gt = shape_r_gt
        self.shape_c_gt = shape_c_gt
        
    def forward(self, output , label , prior):
        prior_size = prior.shape
        mask = torch.tensor([subt.sum() for subt in output.clone()])
        cover = torch.zeros_like(output).cuda()
        cover[mask==0] = 0.0001
        output = output + cover
        output_max = torch.max(torch.max(output,2)[0],2)[0].unsqueeze(2).unsqueeze(2).expand(output.shape[0],output.shape[1],self.shape_r_gt,self.shape_c_gt)
        
        reg = ( 1.0/(prior_size[0]*prior_size[1]) ) * ( 1 - prior)**2
        loss = torch.mean( ((output / output_max) - label)**2 / (1.1 - label) )  +  torch.sum(reg)
        return loss

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models


class MLNet(nn.Module):
    
    def __init__(self,prior_size):
        super(MLNet, self).__init__()
        # loading pre-trained vgg16 model and         
        # removing last max pooling layer
        features = list(models.vgg16(pretrained = True).features)[:-1]
        
        
        # making same spatial size
        # by calculation :) 
        # in pytorch there was problem outputing same size in maxpool2d
        features[23].stride = 1
        features[23].kernel_size = 5
        features[23].padding = 2
                
        self.features = nn.ModuleList(features).eval()
#         self.features.requires_grad = False
        # adding dropout layer
        self.fddropout = nn.Dropout2d(p=0.5)
        # adding convolution layer to down number of filters 1280 ==> 64
        self.int_conv = nn.Conv2d(1280,64,kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
        self.pre_final_conv = nn.Conv2d(64,1,kernel_size=(1, 1), stride=(1, 1) ,padding=(0, 0))
        # prior initialized to ones
        self.prior = nn.Parameter(torch.ones((1,1,prior_size[0],prior_size[1]), requires_grad=True))
        
        # bilinear upsampling layer
        self.bilinearup = torch.nn.UpsamplingBilinear2d(scale_factor=10)
        
    def forward(self, x):
        
        results = []
        for ii,model in enumerate(self.features):
            x = model(x)
            if ii in {16,23,29}:
                results.append(x)
        
        # concat to get 1280 = 512 + 512 + 256
        x = torch.cat((results[0],results[1],results[2]),1) 
        
        # adding dropout layer with dropout set to 0.5 (default)
        x = self.fddropout(x)
        
        # 64 filters convolution layer
        x = self.int_conv(x)
        # 1*1 convolution layer
        x = self.pre_final_conv(x)
        
        upscaled_prior = self.bilinearup(self.prior)
        # print ("upscaled_prior shape: {}".format(upscaled_prior.shape))

        # dot product with prior
        x = x * upscaled_prior
        x = torch.nn.functional.relu(x,inplace=True)
        return x


In [None]:
class Trainer:
    def __init__(self, model, criterion, optimizer, loaders):
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
        self.model = model.to(self.device)
        self.criterion = criterion.to(self.device)
        self.optimizer = optimizer
        self.loaders = loaders
        
    def run_trainer(self, epochs):
        liveloss = PlotLosses()
        for epoch in range(epochs):
            self.logs = {}
            
            self.model.train()
            self.run_epoch('train', epoch)
            
            self.model.eval()
            with torch.no_grad():
                self.run_epoch('val', epoch)
                
            liveloss.update(self.logs)
            liveloss.send()
                
    def run_epoch(self, phase, epoch):
        running_loss = 0.0
        for x in tqdm(self.loaders.loaders[phase]):
            x_true, y_true = x[:,:-1,:,:], x[:,1,:shape_r_gt,:shape_c_gt].unsqueeze(1)
            x_true, y_true = x_true.to(self.device), y_true.to(self.device)
            print(x_true.shape, y_true.shape)
            y_pred = self.model(x_true)
            print(x_true.shape, y_pred.shape, y_true.shape)
            loss = self.criterion(y_pred, y_true, self.model.prior.clone())
            if phase == 'train':
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
            running_loss += loss.detach() * x_true.size(0)
            print(loss.detach())
            if phase == 'val':
                plt.imshow(x_true[0][0].data.cpu().numpy(),cmap='gray')
                plt.show()
                plt.imshow(y_pred[0][0].data.cpu().numpy(),cmap='gray')
                plt.show()
            
        epoch_loss = running_loss / len(self.loaders.loaders[phase].dataset)
        self.logs[f'{phase}_loss'] = epoch_loss.item()
                

In [None]:
model = MLNet(prior_size)


# freezing Layer
last_freeze_layer = 23
for i,param in enumerate(model.parameters()):
    if i < last_freeze_layer:
        param.requires_grad = False

    
criterion = ModMSELoss(shape_r_gt,shape_c_gt)

optimizer = torch.optim.SGD(model.parameters(), lr=1e-3,weight_decay=0.0005,momentum=0.9,nesterov=True)
# optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-3, weight_decay=1e-4)

loaders = Cat200Loader('cat2000')

In [None]:
trainer = Trainer(model, criterion, optimizer, loaders)

In [None]:
trainer.run_trainer(3)

### Display

In [None]:
import matplotlib.pyplot as plt

# how many pic you want to visualiz at randomly
no_visual = 15


visual_cnt = 0
for x in trainer.loaders.loaders['val']:
    print ("Original")
    x_true, y_true = x[:,:-1,:,:], x[:,1,:60,:80].unsqueeze(1)
    x_true = x_true.cuda()
    y_pred = model.forward(x_true)
    # adding term which were subtracted at pre processing
    plt.imshow(x_true[0].data.cpu().numpy().transpose(1, 2, 0))
    plt.show()
    print ("predicted")
    plt.imshow(y_pred[0].squeeze(0).data.cpu().numpy(),cmap='gray')
    plt.show()
    print ("Original")
    plt.imshow(y_true[0][0],cmap='gray')
    plt.show()
    visual_cnt += 1
    if visual_cnt > no_visual:
      break