In [None]:
import torch
import torch.nn as nn


In [None]:
class CNNBlock(nn.Module):
    def __init__ (self, in_channels,out_channels,stride):
        super().__init__()
        self.conv=nn.Sequential(
             nn.Conv2d(in_channels,out_channels,4,stride,bias=False,padding_mode="reflect"),
             nn.BatchNorm2d(out_channels),
             nn.LeakyReLU(0.2),
        )
        
    def forward(self,x):
        return self.conv(x)
class Discriminator (nn.Module):
    def __init__(self,in_channels=3,features=[64,128,256,512]):
        super().__init__()
        self.initial = nn.Sequential(
        nn.Conv2d(in_channels*2,features[0],kernel_size=4,stride=2,padding=1,padding_mode="reflect"),
        nn.LeakyReLU(0.2),
        )
        layers=[]
        in_channels = features[0]
        for feature in features[1:]:
            layers.append(
            CNNBlock(in_channels,feature,stride=1 if feature  == features[-1] else 2),
            )
            in_channels = feature
        layers.append(nn.Conv2d(in_channels,1,kernel_size=4,stride=1,padding=1,padding_mode="reflect"))
        self.model = nn.Sequential (*layers)
    def forward(self,x,y):
        x=torch.cat([x,y],dim=1)
        x=self.initial(x)
        return self.model(x)

def test():
    x= torch.randn((1,3,286,286))
    y = torch.randn((1,3,286,286))
    model = Discriminator()
    preds = model(x,y)
    print(preds.shape) 
    
if __name__ =='__main__':
    test()
    

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, down=True, act="relu", use_dropout=False):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 4, 2, 1, bias=False, padding_mode="reflect")
            if down
            else nn.ConvTranspose2d(in_channels, out_channels, 4, 2, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU() if act == "relu" else nn.LeakyReLU(0.2),
        )
        self.use_dropout = use_dropout
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, x):
        x = self.conv(x)
        return self.dropout(x) if self.use_dropout else x


class Generator(nn.Module):
    def __init__(self, in_channels=3, features=64):
        super().__init__()
        self.initial_down = nn.Sequential(
            nn.Conv2d(in_channels, features, 4, 2, 1, padding_mode="reflect"),
            nn.LeakyReLU(0.2),
        )
        self.down1 = Block(features, features*2, down=True, act="leaky", use_dropout=False)
        self.down2 = Block(features*2, features*4, down=True, act="leaky", use_dropout=False)
        self.down3 = Block(features*4, features*8, down=True, act="leaky", use_dropout=False)
        self.down4 = Block(features*8, features*8, down=True, act="leaky", use_dropout=False)
        self.down5 = Block(features*8, features*8, down=True, act="leaky", use_dropout=False)
        self.down6 = Block(features*8, features*8, down=True, act="leaky",use_dropout=False)
        self.bottleneck = nn.Sequential(
            nn.Conv2d(features*8, features*8, 4, 2, 1, padding_mode="reflect"),
            nn.ReLU()
        )
        self.up1 = Block(features*8, features*8, down=False, act="relu", use_dropout=True)
        self.up2 = Block(features*8*2, features*8, down=False, act="relu", use_dropout=True)
        self.up3 = Block(features*8*2, features*8, down=False, act="relu", use_dropout=True)
        self.up4 = Block(features*8*2, features*8, down=False, act="relu", use_dropout=True)
        self.up5 = Block(features*8*2, features*4, down=False, act="relu", use_dropout=True)
        self.up6 = Block(features*4*2, features*2, down=False, act="relu", use_dropout=True)
        self.up7 = Block(features*2*2, features, down=False, act="relu", use_dropout=True)
        self.final_up = nn.Sequential(
            nn.ConvTranspose2d(features*2, in_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh(),
        )
        
    def forward(self, x):
        d1 = self.initial_down(x)
        d2 = self.down1(d1)
        d3 = self.down2(d2)
        d4 = self.down3(d3)
        d5 = self.down4(d4)
        d6 = self.down5(d5)
        d7 = self.down6(d6)
        bottleneck = self.bottleneck(d7)
        up1 = self.up1(bottleneck)
        up2 = self.up2(torch.cat([up1, d7], 1))
        up3 = self.up3(torch.cat([up2, d6], 1))
        up4 = self.up4(torch.cat([up3, d5], 1))
        up5 = self.up5(torch.cat([up4, d4], 1))
        up6 = self.up6(torch.cat([up5, d3], 1))
        up7 = self.up7(torch.cat([up6, d2], 1))
        return self.final_up(torch.cat([up7,d1],1))


def test():
    x = torch.randn((32, 3, 256, 256))
    model = Generator(in_channels=3, features=64)
    pred = model(x)
    print(pred.shape)

if __name__ == "__main__":
    test()


In [None]:
import cv2
import numpy as np
import glob
import os

image_paths = glob.glob(os.path.join('/kaggle/input/picstopics/PICStoPICS/maps/train/train', '*.jpg'))

output_dir = '/kaggle/input/picstopics/PICStoPICS/maps/train'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)


for image_path in image_paths:

    image = cv2.imread(image_path)

    height, width, _ = image.shape
    half_width = width // 2

    data_roi = image[:, :half_width]
    label_roi = image[:, half_width:]

    data_dir = os.path.join(output_dir, 'data')
    label_dir = os.path.join(output_dir, 'labels')

    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    if not os.path.exists(label_dir):
        os.makedirs(label_dir)

    data_roi_path = os.path.join(data_dir, os.path.basename(image_path))
    label_roi_path = os.path.join(label_dir, os.path.basename(image_path))
    cv2.imwrite(data_roi_path, data_roi)
    cv2.imwrite(label_roi_path, label_roi)




In [None]:
import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

class DataLabelDataset(Dataset):
    def __init__(self, data_dir_data, data_dir_labels, transform=None):
        self.data_dir_data = data_dir_data
        self.data_dir_labels = data_dir_labels
        self.data_files = os.listdir(data_dir_data)
        self.label_files = os.listdir(data_dir_labels)
        self.transform = transform

    def __len__(self):
        return len(self.data_files)

    def __getitem__(self, idx):
        data_img_name = os.path.join(self.data_dir_data, self.data_files[idx])
        label_img_name = os.path.join(self.data_dir_labels, self.label_files[idx])

        data_image = Image.open(data_img_name)
        label_image = Image.open(label_img_name)

        if self.transform:
            data_image = self.transform(data_image)
            label_image = self.transform(label_image)

        return data_image, label_image

data_dir_data = '/kaggle/input/picstopics/PICStoPICS/maps/train/data'
data_dir_labels = '/kaggle/input/picstopics/PICStoPICS/maps/train/labels'

transform = transforms.Compose([
    transforms.ToTensor(), transforms.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5]),transforms.Resize((256, 256))
])


data_label_dataset = DataLabelDataset(data_dir_data, data_dir_labels, transform=transform)

train_loader = DataLoader(data_label_dataset, batch_size=32, shuffle=True,drop_last=True)

for x, y in train_loader:
    print(x.shape,y.shape)
    break
    



In [None]:

def train():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    disc = Discriminator(in_channels=3).to(device)
    gen = Generator(in_channels=3, features=64).to(device)
    optim_disc = torch.optim.Adam(disc.parameters(), lr=0.0002, betas=(0.5, 0.999))
    optim_gen = torch.optim.Adam(gen.parameters(), lr=0.0002, betas=(0.5, 0.999))
    Disc_loss_final = []
    Gen_loss_final = []
    predicted_images = []
    BCE = nn.BCEWithLogitsLoss()
    L1_LOSS = nn.L1Loss()
    epochs = 300

    for epochi in range(epochs):
        for data, map_data in train_loader:
            data, map_data = data.to(device), map_data.to(device)

            # Train Discriminator
            Disc_losses = []
            x = torch.randn((32, 3, 256, 256)).to(device)
            pred_images = gen(x)
            pred_real_disc = disc(data, map_data)
            pred_fake_disc = disc(data, pred_images.detach())
            real_loss_disc = BCE(pred_real_disc, torch.ones_like(pred_real_disc))
            fake_loss_disc = BCE(pred_fake_disc, torch.zeros_like(pred_fake_disc))
            Disc_loss = (real_loss_disc + fake_loss_disc) / 2
            Disc_losses.append(Disc_loss.item())
            optim_disc.zero_grad()
            Disc_loss.backward()
            optim_disc.step()

            # Train Generator
            Gen_loss = []
            pred_fake_disc = disc(data, pred_images.detach())
            gen_fake_loss = BCE(pred_fake_disc, torch.ones_like(pred_fake_disc))
            l1 = L1_LOSS(pred_images, map_data)
            gen_loss = gen_fake_loss + l1
            Gen_loss.append(gen_loss.item())
            optim_gen.zero_grad()
            gen_loss.backward()
            optim_gen.step()

        Disc_loss_final.append(torch.mean(torch.tensor(Disc_losses)))
        Gen_loss_final.append(torch.mean(torch.tensor(Gen_loss)))

        if (epochi + 1) % 2 == 0:
            print(f'Finished epoch {epochi+1}/{epochs}')
            
    # Assuming you have a DataLoader instance for validation/test
    x, y = next(iter(train_loader))
    x, y = x.to(device), y.to(device)
    predicted_images.append(gen(x).detach().cpu())  # Move to CPU for storage

    return Disc_loss_final, Gen_loss_final, disc, gen, predicted_images


In [None]:

Disc_loss, Gen_loss, disc, gen, predicted_images = train()










In [None]:
import matplotlib.pyplot as plt

Disc_loss = [torch.tensor([loss]).cpu().item() for loss in Disc_loss]
Gen_loss = [torch.tensor([loss]).cpu().item() for loss in Gen_loss]

fig, ax = plt.subplots(1, 2, figsize=(10, 5))
ax[0].plot(Disc_loss, 'r^-')
ax[0].set_title('Discriminator Loss')
ax[0].set_xlabel('Epochs')
ax[0].set_ylabel('Loss')

ax[1].plot(Gen_loss, 'g*-')
ax[1].set_title('Generator Loss')
ax[1].set_xlabel('Epochs')
ax[1].set_ylabel('Loss')
plt.show()


num_images_to_display = min(4, len(predicted_images))  
fig, axes = plt.subplots(1, num_images_to_display, figsize=(15, 5))

# Handle case when num_images_to_display is 1
if num_images_to_display == 1:
    axes = [axes]

for i in range(num_images_to_display):
    axes[i].imshow(predicted_images[i][0].permute(1, 2, 0).cpu()) 
    axes[i].axis('off')

plt.show()