Imports

In [25]:
import torch
from torch import nn , optim, Tensor
from torch.autograd.variable import Variable
from torch.utils.data import Dataset , DataLoader
from torchvision import transforms , datasets
from PIL import Image
import glob
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device('cuda')
print(device)

cuda


Parameters and hyperparameters

In [26]:
CHANNELS = 3                    # 3 channels - RGB
NUM_EPOCHS = 80                 # No. of epochs 80 (tradeoff between reasonable training time and good results)
BATCH_SIZE = 16                
IMAGE_SIZE = 256
MASK_SIZE = 75
CHECKPOINT_INTERVAL = 100       # Save models after training on 100 batches

Generator Definition

In [27]:
# Generator Model layers (for detailed description, refer the final Report)
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()

        self.generator_model = nn.Sequential(
            nn.Conv2d(CHANNELS, 64, 4, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 64, 4, stride=2, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, 4, stride=2, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2),         
            nn.Conv2d(128, 128, 4, stride=2, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2),            
            nn.Conv2d(128, 256, 4, stride=2, padding=1),
            nn.BatchNorm2d(256, 0.8),
            nn.LeakyReLU(0.2),
            nn.Conv2d(256, 512, 4, stride=2, padding=1),
            nn.BatchNorm2d(512, 0.8),
            nn.LeakyReLU(0.2),           
            nn.Conv2d(512, 4000, 4),
            nn.ConvTranspose2d(4000, 512, 4, stride=1, padding=0),
            nn.BatchNorm2d(512, 0.8),
            nn.ReLU(),
            nn.ConvTranspose2d(512, 256, 4, stride=2, padding=1),
            nn.BatchNorm2d(256, 0.8),
            nn.ReLU(),
            nn.ConvTranspose2d(256, 128, 4, stride=2, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.ReLU(),           
            nn.ConvTranspose2d(128, 128, 4, stride=2, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.ReLU(),            
            nn.ConvTranspose2d(128, 64, 4, stride=2, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 64, 4, stride=2, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.ReLU(),
            nn.ConvTranspose2d(64, CHANNELS, 4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        return self.generator_model(x)

Discriminator Definition

In [28]:
# Discriminator Model layers (for detailed description, refer the final Report)
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.discriminator_model = nn.Sequential(
            nn.Conv2d(CHANNELS, 64, 4, 2, 1),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, 128, 4, 2, 1),
            nn.InstanceNorm2d(128),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(128, 256, 4, 2, 1),
            nn.InstanceNorm2d(256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(256, 512, 4, 2, 1),
            nn.InstanceNorm2d(512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(512, 1, 3, 1, 1)
        )

    def forward(self, x):
        return self.discriminator_model(x)

Create models, initialize weights, loss functions and optimizers (partially commented)

In [29]:
# Initialize the weights for faster convergence
def init_weights(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)
        
# Create models
g_model = Generator()
g_model.to(device)
d_model = Discriminator()
d_model.to(device)

# UNCOMMENTING THE FOLLOWING 4 LINES WILL RE-INITIALIZE THE MODELS (do only for retraining)
# g_model.apply(init_weights)
# torch.save(g_model.state_dict() , "g_model.pth")
# d_model.apply(init_weights)
# torch.save(d_model.state_dict() , "d_model.pth")

# MSE and L1 loss used later
lossMSE = nn.MSELoss()
lossL1 = nn.L1Loss()

# Adam optimizer for both generator and discriminator
g_optim = optim.Adam(g_model.parameters() , lr = 0.0002 , betas = (0.5 , 0.999))
d_optim = optim.Adam(d_model.parameters() , lr = 0.0002 , betas = (0.5 , 0.999))

Data Loader Definition

In [30]:
# Customized data loader class
# Version = 0 --> Training
# Version = 1 --> Testing
# For ease of use, dataset paths are hardcoded
class MyDataset(Dataset):
    def __init__(self , version):
        self.converter = transforms.Compose([
                                    transforms.ToTensor(),
                                    transforms.Normalize((.5, .5, .5), (.5, .5, .5))
                                ])
        self.version = version
        self.masked = None
        self.unmasked = None
        dataset_path = "/kaggle/input/photo-reconstruction/Dataset"
        if self.version == 0:
            self.unmasked = glob.glob(dataset_path + "/Training_Data/Cat/Unmasked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Cat/Unmasked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Cat/Unmasked_Train/*.png") \
                          + glob.glob(dataset_path + "/Training_Data/Dog/Unmasked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Dog/Unmasked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Dog/Unmasked_Train/*.png") \
                          + glob.glob(dataset_path + "/Training_Data/Elephant/Unmasked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Elephant/Unmasked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Elephant/Unmasked_Train/*.png") \
                          + glob.glob(dataset_path + "/Training_Data/Tiger/Unmasked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Tiger/Unmasked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Tiger/Unmasked_Train/*.png")
            self.masked   = glob.glob(dataset_path + "/Training_Data/Cat/Masked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Cat/Masked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Cat/Masked_Train/*.png") \
                          + glob.glob(dataset_path + "/Training_Data/Dog/Masked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Dog/Masked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Dog/Masked_Train/*.png") \
                          + glob.glob(dataset_path + "/Training_Data/Elephant/Masked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Elephant/Masked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Elephant/Masked_Train/*.png") \
                          + glob.glob(dataset_path + "/Training_Data/Tiger/Masked_Train/*.jpg") \
                          + glob.glob(dataset_path + "/Training_Data/Tiger/Masked_Train/*.jpeg") \
                          + glob.glob(dataset_path + "/Training_Data/Tiger/Masked_Train/*.png")
        else:
            self.masked   = glob.glob(dataset_path + "/Testing_Data/*.jpg") \
                          + glob.glob(dataset_path + "/Testing_Data/*.jpeg") \
                          + glob.glob(dataset_path + "/Testing_Data/*.png")

    def __getitem__(self , index):
        filename = self.masked[index % len(self.masked)]
        img_masked = Image.open(filename)
        img_masked_tensor = self.converter(img_masked)
        img_masked_tensor = Variable(img_masked_tensor.type(torch.cuda.FloatTensor))
        img_masked_tensor.to(device)
        if self.version == 0:
            # For training purposes, return both the masked and unmasked tensors, since the discriminator needs both
            img_unmasked = Image.open(self.unmasked[index % len(self.unmasked)])
            img_unmasked_tensor = self.converter(img_unmasked)
            img_unmasked_tensor = Variable(img_unmasked_tensor.type(torch.cuda.FloatTensor))
            img_unmasked_tensor.to(device)
            return img_masked_tensor , img_unmasked_tensor
        else:
            # For testing purposes, return masked version and the file name (file name needed for inpainting)
            return img_masked_tensor , filename

    def __len__(self):
        return len(self.masked)

Load data into dataloader

In [31]:
# Create train and test data loaders
train_data = MyDataset(0)
test_data = MyDataset(1)

train_loader = DataLoader(
    train_data,
    batch_size = BATCH_SIZE,
    shuffle = True
)

test_loader = DataLoader(
    test_data,
    batch_size = BATCH_SIZE,
    shuffle = False
)

print(len(train_data))      # Number of training examples
print(len(test_data))       # Number of testing examples
print(len(test_loader))     # Number of testing batches (for reference)

7000
200
13


Training Loop

In [32]:
# COMMENT OUT THIS WHOLE BLOCK IF YOU DO NOT WISH TO TRAIN AGAIN
for epoch in range(NUM_EPOCHS):
    epoch_g_loss = 0    # Sum of losses of all batches in one epoch for generator
    epoch_d_loss = 0    # Sum of losses of all batches in one epoch for discriminator
    for i, (masked , unmasked) in enumerate(train_loader):
        
        real_labels = Variable(torch.cuda.FloatTensor(masked.size(0),1,16,16).fill_(1.0), requires_grad=False)  # All 1's
        real_labels.to(device)
        fake_labels = Variable(torch.cuda.FloatTensor(masked.size(0),1,16,16).fill_(0.0), requires_grad=False)  # All 0's
        fake_labels.to(device)
        
        g_optim.zero_grad()
        g_out = g_model(masked)     # Generate fake unmasked image, given real masked image
        g_entropy = lossMSE(d_model(g_out) , real_labels)   
        g_reconstruct = lossL1(g_out , unmasked)
        g_loss = 0.001 * g_entropy + 0.999 * g_reconstruct
        g_loss.backward()           # Backpropagate on generator
        g_optim.step()
        
        d_optim.zero_grad()
        d_real = lossMSE(d_model(unmasked) , real_labels)       # Make predictions on real unmasked image
        d_fake = lossMSE(d_model(g_out.detach()) , fake_labels) # Make predictions on fake unmasked image (generator's output)
        d_loss = 0.5 * d_real + 0.5 * d_fake
        d_loss.backward()           # Backpropagate on discriminator
        d_optim.step()
        epoch_g_loss += g_loss
        epoch_d_loss += d_loss
        
        # Save model after each checkpoint
        if i % CHECKPOINT_INTERVAL == 0:
            torch.save(g_model , "g_model.pth")
            torch.save(d_model , "d_model.pth")
            print(f"Epoch = {epoch}, Batch = {i} , Generator Loss = {g_loss} , Discriminator Loss = {d_loss}")
    
    print(f"Epoch: {epoch} , Total Generator Loss = {epoch_g_loss} , Total Discriminator Loss = {epoch_d_loss}")

Epoch: 0 , Total Generator Loss = 143.85662841796875 , Total Discriminator Loss = 27.583545684814453
Epoch: 1 , Total Generator Loss = 120.1114501953125 , Total Discriminator Loss = 3.1595077514648438
Epoch: 2 , Total Generator Loss = 113.61994171142578 , Total Discriminator Loss = 1.7974348068237305
Epoch: 3 , Total Generator Loss = 109.79947662353516 , Total Discriminator Loss = 15.27480411529541
Epoch: 4 , Total Generator Loss = 106.36138916015625 , Total Discriminator Loss = 2.2146148681640625
Epoch: 5 , Total Generator Loss = 103.74385833740234 , Total Discriminator Loss = 0.961919903755188
Epoch: 6 , Total Generator Loss = 101.65727233886719 , Total Discriminator Loss = 7.520349025726318
Epoch: 7 , Total Generator Loss = 100.44464874267578 , Total Discriminator Loss = 1.2536975145339966
Epoch: 8 , Total Generator Loss = 97.4548568725586 , Total Discriminator Loss = 0.9105815291404724
Epoch: 9 , Total Generator Loss = 94.3191909790039 , Total Discriminator Loss = 0.795360743999481

Function to generate the report

In [33]:
import pandas as pd
data = []
df = pd.read_csv("/kaggle/input/photo-reconstruction/Dataset/Testing_Data/masked_info.csv")
# Output is image Tensor of shape 3,256,256
# filename is only name not path
def add_csv(output,filename):
    box1_row = df[df['filename'] == filename].iloc[0]['box1_row']
    box1_col = df[df['filename'] == filename].iloc[0]['box1_col']
    box2_row = df[df['filename'] == filename].iloc[0]['box2_row']
    box2_col = df[df['filename'] == filename].iloc[0]['box2_col']
    for i in range(75):
        for j in range(75):
            data.append([filename+'_box1_'+str(box1_row+i)+'_'+str(box1_col+j)+'_0',output[0][box1_row+i][box1_col+j]])
            data.append([filename+'_box1_'+str(box1_row+i)+'_'+str(box1_col+j)+'_1',output[1][box1_row+i][box1_col+j]])
            data.append([filename+'_box1_'+str(box1_row+i)+'_'+str(box1_col+j)+'_2',output[2][box1_row+i][box1_col+j]])
    for i in range(75):
        for j in range(75):
            data.append([filename+'_box2_'+str(box2_row+i)+'_'+str(box2_col+j)+'_0',output[0][box2_row+i][box2_col+j]])
            data.append([filename+'_box2_'+str(box2_row+i)+'_'+str(box2_col+j)+'_1',output[1][box2_row+i][box2_col+j]])
            data.append([filename+'_box2_'+str(box2_row+i)+'_'+str(box2_col+j)+'_2',output[2][box2_row+i][box2_col+j]])

Testing 

In [34]:
g_model = torch.load("/kaggle/input/my-models/g_model.pth")
d_model = torch.load("/kaggle/input/my-models/d_model.pth")

import matplotlib.pyplot as plt

outs = torch.Tensor()
outs = Variable(outs.type(torch.cuda.FloatTensor))
outs.to(device)

with torch.no_grad():
    for i, (masked , filename) in enumerate(test_loader):
        g_out = g_model(masked)                 # Use only generator now, for making fake images (with no holes)
        outs = torch.cat((outs , g_out) , 0)
        
        for j in range(len(g_out)):
            img_tensor = g_out[j].clone()
            img = img_tensor.clone().add(1).div(2).mul(255).clamp(0, 255).cpu().detach().numpy()
            add_csv(img/255 , filename[j].split('/')[-1])   # Log into report
            # Uncomment all the following lines to see the full reconstructed images output by the generator
#             img = img.transpose(1, 2, 0).astype("uint8")
#             if i == 0:
#                 print(filename[j].split('/')[-1])
#                 print(img.shape)
#                 plt.figure()
#                 plt.imshow(img)
    
#     print(Tensor.size(outs))
#     plt.show()

Just checking whether the correct model has been selected, helper code with no real use (commented)

In [35]:
# with torch.no_grad():
#     for epoch in range(1):
#         for i, (masked , unmasked) in enumerate(train_loader):

#             real_labels = Variable(torch.cuda.FloatTensor(masked.size(0),1,16,16).fill_(1.0), requires_grad=False)
#             real_labels.to(device)

#             g_out = g_model(masked)
#             g_entropy = lossMSE(d_model(g_out) , real_labels)
#             g_reconstruct = lossL1(g_out , unmasked)
#             g_loss = 0.001 * g_entropy + 0.999 * g_reconstruct

#             if i % CHECKPOINT_INTERVAL == 0:
#                 print(f"Epoch = {epoch}, Batch = {i} , Generator Loss = {g_loss}")

#         print(f"{epoch} finished")

Save Report

In [36]:
# Convert Data to Panda dataframe
final_df = pd.DataFrame(data, columns=['filename_box_pixel', 'Value'])
# Save dataframe to report.csv and save
final_df.to_csv('report.csv', index=False)