In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
from tqdm.auto import tqdm
import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import models
from torch.utils.data import DataLoader, Dataset
import torch.utils.data as utils
from torchvision import transforms
import torch.nn.functional as F
from torchvision.utils import make_grid

In [2]:
TRAIN_DIR = "/kaggle/input/rgb_2_thermal/train/train"
VAL_DIR = "/kaggle/input/rgb_2_thermal/val/val"

In [3]:
class rgb2thermal_Dataset(Dataset):
    def __init__(self,
                 base_path,
                 transform=transforms.Compose([transforms.ToTensor()]),
                 isTrain=False):
        self.base_path = base_path
        self.rgb_imgNames = sorted(os.listdir(os.path.join(base_path, "rgb")))
        self.thermal_imgNames = sorted(os.listdir(os.path.join(base_path, "thermal")))

        self.isTrain = isTrain
        self.transform = transform


    def __getitem__(self, idx):
        rgb_imName = self.rgb_imgNames[idx]
        thermal_imName = self.thermal_imgNames[idx]

        rgb = Image.open(os.path.join(self.base_path, "rgb",  rgb_imName))
        thermal = Image.open(os.path.join(self.base_path, "thermal", thermal_imName))

        rgb_tf = self.transform(rgb)
        thermal_tf = self.transform(thermal)

        return {"rgb": rgb_tf, "thermal": thermal_tf, "fileName": rgb_imName}


    def __len__(self):
        return len(self.rgb_imgNames)

In [4]:
"""
transform = transforms.Compose([transforms.Resize((128,128)),
                                 transforms.ToTensor(),
                                 transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
                                ])
"""

transform = transforms.Compose([transforms.Resize((256,256)),transforms.ToTensor()])

In [5]:
train_data = rgb2thermal_Dataset(TRAIN_DIR, transform, isTrain=True)
val_data = rgb2thermal_Dataset(VAL_DIR, transform)

In [6]:
trainLoader = DataLoader(train_data, shuffle=True, batch_size=16)
valLoader = DataLoader(val_data, shuffle=False, batch_size=16)

In [7]:
#for data in tqdm(valLoader):
 #   img, thermal, fName = data['rgb'], data['thermal'], str(data['fileName'][0]).strip()

In [8]:
#print(img)

In [9]:
def random_crop(image, dim):
    height, width, _ = dim
    x, y = np.random.uniform(low=0,high=int(height-256)), np.random.uniform(low=0,high=int(width-256))  
    return image[:, int(x):int(x)+256, int(y):int(y)+256]
 
     
def random_jittering_mirroring(input_image, target_image, height=286, width=286):
    
    input_image = input_image.numpy()
    target_image = target_image.numpy()
    #print(input_image.shape)
    #resizing to 158x158
    input_image = cv2.resize(input_image, (height, width) ,interpolation=cv2.INTER_NEAREST)
    target_image = cv2.resize(target_image, (height, width),
                               interpolation=cv2.INTER_NEAREST)
    #print(input_image.shape)
    #cropping (random jittering) to 128x128
    stacked_image = np.stack([input_image, target_image], axis=0)
    cropped_image = random_crop(stacked_image, dim=[height, width, 3])   #158
     
    input_image, target_image = cropped_image[0], cropped_image[1]
    #print(input_image.shape)
    if torch.rand(()) > 0.5:
     # random mirroring
        input_image = np.fliplr(input_image)
        target_image = np.fliplr(target_image)
    return input_image, target_image

In [10]:
def normalize(inp, tar):
    input_image = (inp / 127.5) - 1
    target_image = (tar / 127.5) - 1
    return input_image, target_image

In [11]:
class Train(object):
    def __call__(self, image):
        inp, tar = read_image(image)
        inp, tar = random_jittering_mirroring(inp, tar)
        inp, tar = normalize(inp, tar)
        image_a = torch.from_numpy(inp.copy().transpose((2,0,1)))
        image_b = torch.from_numpy(tar.copy().transpose((2,0,1)))
        return image_a, image_b

In [12]:
def img_rand(inp,tar):
    #inp, tar = random_jittering_mirroring(inp, tar)
    inp=inp.numpy()
    tar= tar.numpy()
    image_a = torch.from_numpy(inp.copy().transpose((1,2,0)))
    image_b = torch.from_numpy(tar.copy().transpose((1,2,0)))
    image_a, image_b = random_jittering_mirroring(image_a,image_b)
    
    #image_a = cv2.resize(image_a, (256, 256) ,interpolation=cv2.INTER_NEAREST)
    #image_b = cv2.resize(input_b, (256, 256) ,interpolation=cv2.INTER_NEAREST)
    
    image_a = torch.from_numpy(image_a.copy().transpose((2,0,1)))
    image_b = torch.from_numpy(image_b.copy().transpose((2,0,1)))
    
    return image_a, image_b
    #return inp, tar

In [13]:
# custom weights initialization called on generator and discriminator
def init_weights(net, init_type='normal', scaling=0.02):
    def init_func(m):  # define the initialization function
        classname = m.__class__.__name__
        if hasattr(m, 'weight') and (classname.find('Conv')) != -1:
            torch.nn.init.normal_(m.weight.data, 0.0, scaling)
        elif classname.find('BatchNorm2d') != -1:  # BatchNorm Layer's weight is not a matrix; only normal distribution applies.
            torch.nn.init.normal_(m.weight.data, 1.0, scaling)
            torch.nn.init.constant_(m.bias.data, 0.0)
 
    print('initialize network with %s' % init_type)
    net.apply(init_func)  # apply the initialization function <init_func>


In [14]:
class UnetGenerator(nn.Module):
    """Create a Unet-based generator"""
 
    def __init__(self, input_nc, output_nc, nf=64, norm_layer=nn.BatchNorm2d, use_dropout=False):
        super(UnetGenerator, self).__init__()
        # construct unet structure
        # add the innermost block
        unet_block = UnetSkipConnectionBlock(nf * 8, nf * 8, input_nc=None, submodule=None, norm_layer=norm_layer, innermost=True) 
        #print(unet_block)
        
 
        # add intermediate block with nf * 8 filters
        unet_block = UnetSkipConnectionBlock(nf * 8, nf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer, use_dropout=use_dropout)
        unet_block = UnetSkipConnectionBlock(nf * 8, nf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer, use_dropout=use_dropout)
        unet_block = UnetSkipConnectionBlock(nf * 8, nf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer, use_dropout=use_dropout)

        # gradually reduce the number of filters from nf * 8 to nf. 
        unet_block = UnetSkipConnectionBlock(nf * 4, nf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
        unet_block = UnetSkipConnectionBlock(nf * 2, nf * 4, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
        unet_block = UnetSkipConnectionBlock(nf, nf * 2, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
         
        # add the outermost block
        self.model = UnetSkipConnectionBlock(output_nc, nf, input_nc=input_nc, submodule=unet_block, outermost=True, norm_layer=norm_layer)  
 
    def forward(self, input):
        """Standard forward"""
        return self.model(input)

In [15]:
class UnetSkipConnectionBlock(nn.Module):
    def __init__(self, outer_nc, inner_nc, input_nc=None,
                 submodule=None, outermost=False, innermost=False, norm_layer=nn.BatchNorm2d, use_dropout=False):
        super(UnetSkipConnectionBlock, self).__init__()
        self.outermost = outermost
        if input_nc is None:
            input_nc = outer_nc
        downconv = nn.Conv2d(input_nc, inner_nc, kernel_size=4,
                             stride=2, padding=1, bias=False)
        downrelu = nn.LeakyReLU(0.2, True)
        downnorm = norm_layer(inner_nc)
        uprelu = nn.ReLU(True)
        upnorm = norm_layer(outer_nc)
 
        if outermost:
            upconv = nn.ConvTranspose2d(inner_nc * 2, outer_nc,
                                        kernel_size=4, stride=2,
                                        padding=1)
            down = [downconv]
            up = [uprelu, upconv, nn.Tanh()]
            model = down + [submodule] + up
        elif innermost:
            upconv = nn.ConvTranspose2d(inner_nc, outer_nc,
                                        kernel_size=4, stride=2,
                                        padding=1, bias=False)
            down = [downrelu, downconv]
            up = [uprelu, upconv, upnorm]
            model = down + up
        else:
            upconv = nn.ConvTranspose2d(inner_nc * 2, outer_nc,
                                        kernel_size=4, stride=2,
                                        padding=1, bias=False)
            down = [downrelu, downconv, downnorm]
            up = [uprelu, upconv, upnorm]
 
            if use_dropout:
                model = down + [submodule] + up + [nn.Dropout(0.5)]
            else:
                model = down + [submodule] + up
 
        self.model = nn.Sequential(*model)
 
    def forward(self, x):
        if self.outermost:
            return self.model(x)
        else:   # add skip connections
            return torch.cat([x, self.model(x)], 1)

In [16]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.device_count())
generator = UnetGenerator(3, 3, 64, norm_layer=nn.BatchNorm2d, use_dropout=False).cuda().float()
init_weights(generator, 'normal', scaling=0.02)
generator = torch.nn.DataParallel(generator)  # multi-GPUs

2
initialize network with normal


In [17]:
class Discriminator(nn.Module):
    def __init__(self, input_nc=6, ndf=32, n_layers=3, norm_layer=nn.BatchNorm2d):
        super(Discriminator, self).__init__()
        kw = 4
        padw = 1
        sequence = [nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True)]
        nf_mult = 1
        nf_mult_prev = 1
        for n in range(1, n_layers):  # gradually increase the number of filters
            nf_mult_prev = nf_mult
            nf_mult = min(2 ** n, 8)
            sequence += [
                nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=False),
                norm_layer(ndf * nf_mult),
                nn.LeakyReLU(0.2, True)
            ]
 
        nf_mult_prev = nf_mult
        nf_mult = min(2 ** n_layers, 8)
        sequence += [
            nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=False),
            norm_layer(ndf * nf_mult),
            nn.LeakyReLU(0.2, True)
        ]
 
        sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw), nn.Sigmoid()]  # output 1 channel prediction map
        self.model = nn.Sequential(*sequence)
 
    def forward(self, input):
        """Standard forward."""
        return self.model(input)

In [18]:
adversarial_loss = nn.BCELoss() 
l1_loss = nn.L1Loss()

In [19]:
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
discriminator = Discriminator()
discriminator.to(device)
adversarial_loss.to(device)
l1_loss.to(device)

L1Loss()

In [20]:
def generator_loss(generated_image, target_img, G, real_target):
    gen_loss = adversarial_loss(G, real_target)
    l1_l = l1_loss(generated_image, target_img)
    gen_total_loss = gen_loss + (100 * l1_l)
    #print(gen_loss)
    return gen_total_loss

In [21]:
def discriminator_loss(output, label):
    disc_loss = adversarial_loss(output, label)
    return disc_loss

In [22]:
#G_optimizer = optim.AdamW(generator3.parameters(), lr=0.001, weight_decay=0.01)
#D_optimizer = optim.Adam(discriminator.parameters(), lr=0.001, betas=(0.5, 0.999))

G_optimizer = optim.Adam(generator.parameters(), lr=2e-4,betas=(0.5,0.999))
D_optimizer = optim.Adam(discriminator.parameters(), lr=2e-4,betas=(0.5,0.999))

In [23]:
num_epochs = 200
D_loss_plot, G_loss_plot = [], []
for epoch in range(1, num_epochs+1): 
   
 
    D_loss_list, G_loss_list = [], []
    train_loss=0
    for i, batch in enumerate(trainLoader):
        
        input_img = batch['rgb']
        target_img = batch['thermal']
        #print(input_img.size())
        
        for j in range(input_img.size(0)):
            inp,tar = img_rand(input_img[j],target_img[j])
            #input_img[j],target_img[j] = torch.tensor(inp),torch.tensor(tar)
            input_img[j], target_img[j] = inp.clone().detach(), tar.clone().detach()
        #inp,ta = img_rand(input_img[0],target_img[0])
        #print(inp.shape)
        
        D_optimizer.zero_grad()
        input_img = input_img.to(device)
        target_img = target_img.to(device)
 
        # ground truth labels real and fake
        real_target = torch.ones(input_img.size(0), 1, 30, 30).to(device)
        fake_target = torch.zeros(input_img.size(0), 1, 30, 30).to(device)
         
        # generator forward pass
        #print(input_img.shape)
        generated_image = generator(input_img)
         
        # train discriminator with fake/generated images
        disc_inp_fake = torch.cat((input_img, generated_image), 1)
         
        D_fake = discriminator(disc_inp_fake.detach())
         
        D_fake_loss   =  discriminator_loss(D_fake, fake_target)
         
        # train discriminator with real images
        disc_inp_real = torch.cat((input_img, target_img), 1)
                                 
        D_real = discriminator(disc_inp_real)
        D_real_loss = discriminator_loss(D_real,  real_target)
 
     
         
        # average discriminator loss
        D_total_loss = (D_real_loss + D_fake_loss) / 2
        D_loss_list.append(D_total_loss)
        # compute gradients and run optimizer step
        D_total_loss.backward()
        D_optimizer.step()
         
         
        # Train generator with real labels
        G_optimizer.zero_grad()
        fake_gen = torch.cat((input_img, generated_image), 1)
        G = discriminator(fake_gen)
        G_loss = generator_loss(generated_image, target_img, G, real_target)                                 
        G_loss_list.append(G_loss)
        train_loss+=G_loss.item()
        # compute gradients and run optimizer step
        G_loss.backward()
        G_optimizer.step()
    train_loss /= len(trainLoader)
    
    if epoch  % 5 == 0 or epoch ==1 :

        print(f"EPOCH: {epoch }, train_loss: {train_loss}")

EPOCH: 1, train_loss: 9.73856692843967


KeyboardInterrupt: 

In [None]:
!mkdir predictions

In [None]:
def invTrans(img):
    return 255 * img

In [None]:
# To convert images to Row-Major Format
def row_major_enc(img):
    lst = []
    H,W,C = img.shape
    for i in range(C):
        for j in range(W):
            for k in range(H):
                lst.append(img[k][j][i])
    return lst

In [None]:
T = transforms.Resize((128,128))

In [None]:
valLoader2 = DataLoader(val_data, shuffle=False, batch_size=1)

generator.eval()
for data in tqdm(valLoader2):
    img, thermal, fName = data['rgb'], data['thermal'], str(data['fileName'][0]).strip()
#     print(fName, len(fName))
    img = img.to(device)
    thermal = thermal.to(device)

    outputs = generator(img).detach()
    out_resize = T(outputs)
    inv_tensor = invTrans(out_resize).cpu().numpy()[0].transpose(1,2,0)
#     print(inv_tensor.shape)
    cv2.imwrite(f"/kaggle/working/predictions/{fName}", inv_tensor)

In [None]:
# Creating a submission.csv
BASE = "/kaggle/working/predictions" # wherever you save predictions OR if you have a prediction list use it directly)
val_rgb_lst = os.listdir(BASE)
subDict = {"ID":[], "RMImg": []}

for im_name in tqdm(val_rgb_lst):
    img = cv2.imread(os.path.join(BASE, im_name))
    rme_img = row_major_enc(img)

    subDict["ID"].append(im_name)
    subDict["RMImg"].append(rme_img)

subPd = pd.DataFrame({"ID": [x for x in subDict["ID"]],
                      "RMImg": [x for x in subDict["RMImg"]]})

subPd.to_csv("submission.csv", index=False)

In [26]:
!rm -rf predictions/*.jpg *.pth

In [27]:
!rm -rf submissions/*.jpg *.pth

In [None]:
"""
for (inputs, targets), _ in val_dl:
    inputs = inputs.to(device)
    generated_output =  generator(inputs)
    save_images(generated_output.data[:10], 'sample_%d'%epoch + '.png', nrow=5, normalize=True)
"""