# Markdown

In [1]:
import cv2 as cv
from model import *
import numpy as np
from PIL import Image
import os
import matplotlib.pyplot as plt
import torch.nn as nn
import torch
import torch.optim as optim
import torch.nn.functional as F
import time
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder, DatasetFolder
from torch.utils.data import DataLoader, TensorDataset, Subset
from sklearn.model_selection import train_test_split

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [3]:
class DualImageFolderDataset(torch.utils.data.Dataset):
    def __init__(self, root_input, root_output, transform=None):
        self.input_data = ImageFolder(root=root_input, transform=transform)
        self.output_data = ImageFolder(root=root_output, transform=transform)

    def __len__(self):
        return len(self.input_data)

    def __getitem__(self, idx):

        return self.input_data[idx][0], self.output_data[idx][0]

In [4]:
root_input = 'Images_jet/input'
root_output = 'Images_jet/output'

transform = transforms.Compose([
    transforms.Resize((160, 160)),  # Resize the images
    transforms.ToTensor(),           # Convert to PyTorch tensor
])


dataset = DualImageFolderDataset(root_input, root_output, transform)

In [5]:
# Split the dataset into train and test subsets using sklearn's train_test_split
train_indices, test_indices = train_test_split(range(len(dataset)), test_size=0.1, random_state=40)

# Create Subset instances for train and test sets
train_dataset = Subset(dataset, train_indices)
test_dataset = Subset(dataset, test_indices)

# Define batch size
batch_size = 64

# Create DataLoader instances for train and test sets
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Now you can use train_loader and test_loader for iterating over your data

In [6]:
def create_models(g_conv_dim = 64 , d_conv_dim = 64 , n_res_blocks = 6):
    
    '''builds the generator and discriminators'''
    
    #Generators
    G_XtoY = Generator(conv_dim = g_conv_dim , res_blocks = n_res_blocks)
    G_YtoX = Generator(conv_dim = g_conv_dim , res_blocks = n_res_blocks)
    
    #Discriminators
    D_X = Discriminator(conv_dim = g_conv_dim )
    D_Y = Discriminator(conv_dim = g_conv_dim)
    
    if torch.cuda.is_available():
        
        #device = torch.device("cuda : 0")
        G_XtoY.cuda()
        G_YtoX.cuda()
        D_X.cuda()
        D_Y.cuda()
        
        print("Models moved to GPU")
    
    else:
        print("Only Cpu Available")
       
    return G_XtoY , G_YtoX ,D_X , D_Y
        
def print_model_arch(model , model_name):
    
    '''This functions prints the network architecture'''
    print(model_name)
    print()
    print("******************************************************************")
    print(model)
    print()
    print("******************************************************************")
    print()

In [7]:
#Creating models and prinitng them
G_XtoY , G_YtoX , D_X , D_Y = create_models()
model_list = [G_XtoY , G_YtoX , D_X , D_Y]
model_name_list = [ 'G_XtoY' , 'G_YtoX' , 'D_X' , 'D_Y']

#Printing the archs
for model , name in zip(model_list , model_name_list):
    print_model_arch(model , name)

Models moved to GPU
G_XtoY

******************************************************************
Generator(
  (conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (conv2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (conv3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (residual_layers): Sequential(
    (0): residual_blocks(
      (conv1): Sequential(
        (0): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )


In [8]:
#function to save the models at checkpoints

def save_at_checkpoints(G_XtoY , G_YtoX , D_X , D_Y , save_dir = 'model_save'):
    
    ''' Saves the parameters of both generators and discriminators'''
    G_XtoY_path = os.path.join(save_dir , 'G_XtoY.pkl')
    G_YtoX_path = os.path.join(save_dir , 'G_YtoX.pkl')
    D_X_path = os.path.join(save_dir , 'D_X.pkl')
    D_Y_path = os.path.join(save_dir , 'D_Y.pkl')
    
    torch.save(G_XtoY.state_dict() , G_XtoY_path)
    torch.save(G_YtoX.state_dict() , G_YtoX_path)
    torch.save(D_X.state_dict() , D_X_path)
    torch.save(D_Y.state_dict() , D_Y_path)
    
def load_parameters(G_XtoY , G_YtoX , D_X , D_Y , save_dir = 'model_save'):
    G_XtoY_path = os.path.join(save_dir , 'G_XtoY.pkl')
    G_YtoX_path = os.path.join(save_dir , 'G_YtoX.pkl')
    D_X_path = os.path.join(save_dir , 'D_X.pkl')
    D_Y_path = os.path.join(save_dir , 'D_Y.pkl')
    
    G_XtoY.load_state_dict(torch.load(G_XtoY_path))
    G_YtoX.load_state_dict(torch.load(G_YtoX_path))
    D_X.load_state_dict(torch.load(D_X_path))
    D_Y.load_state_dict(torch.load(D_Y_path))

In [9]:
load_parameters(G_XtoY , G_YtoX , D_X , D_Y)

In [10]:
#Defining the losses

def real_mse_loss(D_out):
    
    #How close is the produced output from being real
    return torch.mean((D_out - 1)**2)

def fake_mse_loss(D_out):
    
    #How close is the produced output from being fake
    return torch.mean((D_out - 0)**2)

def cycle_cons_loss(real_im , recons_im , lambda_wt):
    
    return lambda_wt * (torch.mean(torch.abs(real_im - recons_im)))

In [16]:
def merge_images(sources , targets , batch_size= 16):
    
    ''' This function creates a grid consisting of columns  , first column contains source images and second column contain the generated images'''
   
    _, _, h, w = sources.shape
    row = int(np.sqrt(batch_size))
    merged = np.zeros([3, row*h, row*w*2])
    for idx, (s, t) in enumerate(zip(sources, targets)):
        i = idx // row
        j = idx % row
        merged[:, i*h:(i+1)*h, (j*2)*h:(j*2+1)*h] = s
        merged[:, i*h:(i+1)*h, (j*2+1)*h:(j*2+2)*h] = t
    merged = merged.transpose(1, 2, 0)
    return merged

In [11]:
#Defining the optimizers

lr = 0.0008
beta1 = 0.5
beta2 = 0.999
n_epochs = 20
last_epoch = 350
gen_params = list(G_XtoY.parameters()) + list(G_YtoX.parameters())

g_optimizer = optim.Adam(gen_params , lr , betas = (beta1 , beta2))
d_x_optimizer = optim.Adam(D_X.parameters() , lr , betas = (beta1 , beta2))
d_y_optimizer = optim.Adam(D_Y.parameters() , lr , betas = (beta1 , beta2))
losses = []


In [None]:
G_XtoY.train()
G_YtoX.train()
for epoch in range(1, n_epochs+1):

    # Reset iterators for each epoch


    # ============================================
    #            TRAIN THE DISCRIMINATORS
    # ============================================
    for images_X, images_Y in train_loader:
        ##   First: D_X, real and fake loss components   ##
        images_X, images_Y = images_X.to(device), images_Y.to(device)
        # Train with real images
        d_x_optimizer.zero_grad()

        # 1. Compute the discriminator losses on real images
        out_x = D_X(images_X)
        D_X_real_loss = real_mse_loss(out_x)

        # Train with fake images

        # 2. Generate fake images that look like domain X based on real images in domain Y
        fake_X = G_YtoX(images_Y)

        # 3. Compute the fake loss for D_X
        out_x = D_X(fake_X)
        D_X_fake_loss = fake_mse_loss(out_x)


        # 4. Compute the total loss and perform backprop
        d_x_loss = D_X_real_loss + D_X_fake_loss
        d_x_loss.backward()
        d_x_optimizer.step()


        ##   Second: D_Y, real and fake loss components   ##

        # Train with real images
        d_y_optimizer.zero_grad()

        # 1. Compute the discriminator losses on real images
        out_y = D_Y(images_Y)
        D_Y_real_loss = real_mse_loss(out_y)

        # Train with fake images

        # 2. Generate fake images that look like domain Y based on real images in domain X
        fake_Y = G_XtoY(images_X)

        # 3. Compute the fake loss for D_Y
        out_y = D_Y(fake_Y)
        D_Y_fake_loss = fake_mse_loss(out_y)

        # 4. Compute the total loss and perform backprop
        d_y_loss = D_Y_real_loss + D_Y_fake_loss
        d_y_loss.backward()
        d_y_optimizer.step()


        # =========================================
        #            TRAIN THE GENERATORS
        # =========================================

        ## First: generate fake X images and reconstructed Y images    ##
        g_optimizer.zero_grad()

        # 1. Generate fake images that look like domain X based on real images in domain Y
        fake_X = G_YtoX(images_Y)

        # 2. Compute the generator loss based on domain X
        out_x = D_X(fake_X)
        g_YtoX_loss = real_mse_loss(out_x)

        # 3. Create a reconstructed y
        # 4. Compute the cycle consistency loss (the reconstruction loss)
        reconstructed_Y = G_XtoY(fake_X)
        reconstructed_y_loss = cycle_cons_loss(images_Y, reconstructed_Y, lambda_wt=10)


        ## Second: generate fake Y images and reconstructed X images    ##

        # 1. Generate fake images that look like domain Y based on real images in domain X
        fake_Y = G_XtoY(images_X)

        # 2. Compute the generator loss based on domain Y
        out_y = D_Y(fake_Y)
        g_XtoY_loss = real_mse_loss(out_y)

        # 3. Create a reconstructed x
        # 4. Compute the cycle consistency loss (the reconstruction loss)
        reconstructed_X = G_YtoX(fake_Y)
        reconstructed_x_loss = cycle_cons_loss(images_X, reconstructed_X, lambda_wt=10)

        # 5. Add up all generator and reconstructed losses and perform backprop
        g_total_loss = g_YtoX_loss + g_XtoY_loss + reconstructed_y_loss + reconstructed_x_loss
        g_total_loss.backward()
        g_optimizer.step()



    if epoch % 5 == 0:
        print("{} epochs training completed and saved model".format(epoch+last_epoch))
        save_at_checkpoints(G_XtoY , G_YtoX , D_X , D_Y)
        losses.append((d_x_loss.item(), d_y_loss.item(), g_total_loss.item()))
        print('Epoch [{:5d}/{:5d}] | d_X_loss: {:6.4f} | d_Y_loss: {:6.4f} | g_total_loss: {:6.4f}'.format(
                epoch, n_epochs, d_x_loss.item(), d_y_loss.item(), g_total_loss.item()))



In [14]:
G_XtoY.eval()
test_iter = iter(test_loader)
batch = next(test_iter)
x, y = batch
x = x.to(device)
y_pred = G_XtoY(x)
print(y_pred.shape)

torch.Size([64, 3, 160, 160])


In [15]:
y_pred = y_pred.cpu()
y_pred = y_pred.permute(0, 2, 3, 1)
y = y.permute(0, 2, 3, 1)

In [None]:
first_8_images_tensor1 = y_pred[:8].detach().numpy()
first_8_images_tensor2 = y[:8].detach().numpy()

# Create a figure with subplots
fig, axes = plt.subplots(2, 8, figsize=(16, 4))

# Loop through and display the images from the first tensor
for i in range(8):
    axes[0, i].imshow(first_8_images_tensor1[i])
    axes[0, i].axis('off')  # Turn off axis labels

# Loop through and display the images from the second tensor
for i in range(8):
    axes[1, i].imshow(first_8_images_tensor2[i])
    axes[1, i].axis('off')  # Turn off axis labels

plt.show()