**Importing Libraries**

In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image
import numpy as np
import itertools
import torchvision.utils as vutils


**Hyperparameters**

In [2]:
IMG_SIZE = 256
BATCH_SIZE = 3  
LR = 0.0002
BETA1 = 0.5
NUM_EPOCHS = 200
LAMBDA_CYCLE = 10.0
LAMBDA_PERCEPTUAL = 5.0  
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

**Loading Data**

In [3]:
# Custom Dataset
class ImageDataset(Dataset):
    def __init__(self, photo_dir, monet_dir, transform=None):
        self.photo_dir = photo_dir
        self.monet_dir = monet_dir
        self.transform = transform
        self.photos = sorted(os.listdir(photo_dir))
        self.monets = sorted(os.listdir(monet_dir))

    def __len__(self):
        return max(len(self.photos), len(self.monets))

    def __getitem__(self, idx):
        photo_path = os.path.join(self.photo_dir, self.photos[idx % len(self.photos)])
        monet_path = os.path.join(self.monet_dir, self.monets[idx % len(self.monets)])

        photo = Image.open(photo_path).convert('RGB')
        monet = Image.open(monet_path).convert('RGB')

        if self.transform:
            photo = self.transform(photo)
            monet = self.transform(monet)

        return {'photo': photo, 'monet': monet}

**Defining Resnet Block, Generator and Discriminator**

In [4]:
# Generator (U-Net with ResNet blocks)
class ResNetBlock(nn.Module):
    def __init__(self, dim):
        super(ResNetBlock, self).__init__()
        self.conv_block = nn.Sequential(
            nn.Conv2d(dim, dim, 3, 1, 1, bias=False),
            nn.InstanceNorm2d(dim),
            nn.ReLU(True),
            nn.Conv2d(dim, dim, 3, 1, 1, bias=False),
            nn.InstanceNorm2d(dim),
        )

    def forward(self, x):
        return x + self.conv_block(x)


class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, 7, 1, 3, bias=False),
            nn.InstanceNorm2d(64),
            nn.ReLU(True),
            nn.Conv2d(64, 128, 3, 2, 1, bias=False),
            nn.InstanceNorm2d(128),
            nn.ReLU(True),
            nn.Conv2d(128, 256, 3, 2, 1, bias=False),
            nn.InstanceNorm2d(256),
            nn.ReLU(True),
            *[ResNetBlock(256) for _ in range(9)],
            nn.ConvTranspose2d(256, 128, 3, 2, 1, 1, bias=False),
            nn.InstanceNorm2d(128),
            nn.ReLU(True),
            nn.ConvTranspose2d(128, 64, 3, 2, 1, 1, bias=False),
            nn.InstanceNorm2d(64),
            nn.ReLU(True),
            nn.Conv2d(64, 3, 7, 1, 3, bias=False),
            nn.Tanh()
        )

    def forward(self, x):
        return self.model(x)

# Discriminator (PatchGAN)
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(3, 64, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(64, 128, 4, 2, 1, bias=False),
            nn.InstanceNorm2d(128),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(128, 256, 4, 2, 1, bias=False),
            nn.InstanceNorm2d(256),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(256, 512, 4, 1, 1, bias=False),
            nn.InstanceNorm2d(512),
            nn.LeakyReLU(0.2, True),
            nn.Conv2d(512, 1, 4, 1, 1, bias=False),
        )

    def forward(self, x):
        return self.model(x)


**Weight Initialization and Image Buffer**

In [5]:
# Initialize weights
def weights_init(m):
    if isinstance(m, (nn.Conv2d, nn.ConvTranspose2d)):
        nn.init.normal_(m.weight.data, 0.0, 0.02)

# Image buffer
class ImageBuffer:
    def __init__(self, max_size=50):
        self.buffer = []
        self.max_size = max_size

    def push_and_pop(self, images):
        to_return = []
        for image in images:
            image = image.unsqueeze(0)
            if len(self.buffer) < self.max_size:
                self.buffer.append(image)
                to_return.append(image)
            else:
                if np.random.uniform(0, 1) > 0.5:
                    i = np.random.randint(0, self.max_size)
                    to_return.append(self.buffer[i].clone())
                    self.buffer[i] = image
                else:
                    to_return.append(image)
        return torch.cat(to_return, 0)

**Defining CycleGAN Model**

In [6]:
# Training function
def train_cycle_gan(photo_dir, monet_dir, output_dir, models_dir="models"):
    # Create separate directories
    os.makedirs(output_dir, exist_ok=True)    # For generated images
    os.makedirs(models_dir, exist_ok=True)    # For model checkpoints

    transform = transforms.Compose([
        transforms.Resize((IMG_SIZE + 30, IMG_SIZE + 30)),
        transforms.RandomCrop(IMG_SIZE),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])

    dataset = ImageDataset(photo_dir, monet_dir, transform=transform)
    dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4)

    # Initialize models
    G = Generator().to(DEVICE)
    F = Generator().to(DEVICE)
    D_X = Discriminator().to(DEVICE)
    D_Y = Discriminator().to(DEVICE)

    G.apply(weights_init)
    F.apply(weights_init)
    D_X.apply(weights_init)
    D_Y.apply(weights_init)

    # Optimizers with learning rate scheduling
    optimizer_G = optim.Adam(itertools.chain(G.parameters(), F.parameters()), lr=LR, betas=(BETA1, 0.999))
    optimizer_D_X = optim.Adam(D_X.parameters(), lr=LR, betas=(BETA1, 0.999))
    optimizer_D_Y = optim.Adam(D_Y.parameters(), lr=LR, betas=(BETA1, 0.999))
    lr_scheduler_G = optim.lr_scheduler.StepLR(optimizer_G, step_size=100, gamma=0.1)
    lr_scheduler_D_X = optim.lr_scheduler.StepLR(optimizer_D_X, step_size=100, gamma=0.1)
    lr_scheduler_D_Y = optim.lr_scheduler.StepLR(optimizer_D_Y, step_size=100, gamma=0.1)

    # Loss functions
    criterion_GAN = nn.MSELoss()
    criterion_cycle = nn.L1Loss()
    criterion_perceptual = nn.L1Loss()
    vgg = models.vgg16(pretrained=True).features[:16].to(DEVICE).eval()  

    # Image buffers
    fake_photo_buffer = ImageBuffer()
    fake_monet_buffer = ImageBuffer()

    # Training loop
    for epoch in range(NUM_EPOCHS):
        for i, batch in enumerate(dataloader):
            real_photo = batch['photo'].to(DEVICE)
            real_monet = batch['monet'].to(DEVICE)

            # Ground truth for GAN loss
            real_label = torch.ones((real_photo.size(0), 1, 30, 30), device=DEVICE)
            fake_label = torch.zeros((real_photo.size(0), 1, 30, 30), device=DEVICE)

            # --- Train Generators ---
            optimizer_G.zero_grad()

            # Generate fake images
            fake_monet = G(real_photo)
            fake_photo = F(real_monet)

            # Reconstruct images
            rec_photo = F(fake_monet)
            rec_monet = G(fake_photo)

            # Adversarial losses
            loss_G = criterion_GAN(D_Y(fake_monet), real_label)
            loss_F = criterion_GAN(D_X(fake_photo), real_label)

            # Cycle losses
            loss_cycle_photo = criterion_cycle(rec_photo, real_photo)
            loss_cycle_monet = criterion_cycle(rec_monet, real_monet)

            # Perceptual losses
            with torch.no_grad():
                vgg_real_photo = vgg(real_photo)
                vgg_real_monet = vgg(real_monet)
            loss_perceptual_photo = criterion_perceptual(vgg(rec_photo), vgg_real_photo)
            loss_perceptual_monet = criterion_perceptual(vgg(rec_monet), vgg_real_monet)

            # Total generator loss
            total_loss_G = (loss_G + loss_F +
                           LAMBDA_CYCLE * (loss_cycle_photo + loss_cycle_monet) +
                           LAMBDA_PERCEPTUAL * (loss_perceptual_photo + loss_perceptual_monet))
            total_loss_G.backward()
            optimizer_G.step()

            # --- Train Discriminators ---
            # Photo discriminator
            optimizer_D_X.zero_grad()
            real_loss_D_X = criterion_GAN(D_X(real_photo), real_label)
            fake_photo_buffered = fake_photo_buffer.push_and_pop(fake_photo)
            fake_loss_D_X = criterion_GAN(D_X(fake_photo_buffered.detach()), fake_label)
            loss_D_X = (real_loss_D_X + fake_loss_D_X) * 0.5
            loss_D_X.backward()
            optimizer_D_X.step()

            # Monet discriminator
            optimizer_D_Y.zero_grad()
            real_loss_D_Y = criterion_GAN(D_Y(real_monet), real_label)
            fake_monet_buffered = fake_monet_buffer.push_and_pop(fake_monet)
            fake_loss_D_Y = criterion_GAN(D_Y(fake_monet_buffered.detach()), fake_label)
            loss_D_Y = (real_loss_D_Y + fake_loss_D_Y) * 0.5
            loss_D_Y.backward()
            optimizer_D_Y.step()

            # Print all losses
            if i % 100 == 0:
                print(f"Epoch [{epoch}/{NUM_EPOCHS}] Batch [{i}/{len(dataloader)}]")
                print(f"Discriminator Losses:")
                print(f"  D_X Real: {real_loss_D_X.item():.4f}, D_X Fake: {fake_loss_D_X.item():.4f}, D_X Total: {loss_D_X.item():.4f}")
                print(f"  D_Y Real: {real_loss_D_Y.item():.4f}, D_Y Fake: {fake_loss_D_Y.item():.4f}, D_Y Total: {loss_D_Y.item():.4f}")
                print(f"Generator Losses:")
                print(f"  G Adv: {loss_G.item():.4f}, F Adv: {loss_F.item():.4f}")
                print(f"  Cycle Photo: {loss_cycle_photo.item():.4f}, Cycle Monet: {loss_cycle_monet.item():.4f}")
                print(f"  Perceptual Photo: {loss_perceptual_photo.item():.4f}, Perceptual Monet: {loss_perceptual_monet.item():.4f}")
                print(f"  Total G Loss: {total_loss_G.item():.4f}")

        # Learning rate step
        lr_scheduler_G.step()
        lr_scheduler_D_X.step()
        lr_scheduler_D_Y.step()


    # Save model checkpoints every 20 epochs
        if epoch % 20 == 0 or epoch == NUM_EPOCHS - 1:
            torch.save({
                'epoch': epoch,
                'G_state_dict': G.state_dict(),
                'F_state_dict': F.state_dict(),
                'D_X_state_dict': D_X.state_dict(),
                'D_Y_state_dict': D_Y.state_dict(),
                'optimizer_G_state_dict': optimizer_G.state_dict(),
                'optimizer_D_X_state_dict': optimizer_D_X.state_dict(),
                'optimizer_D_Y_state_dict': optimizer_D_Y.state_dict(),
            }, f"{models_dir}/checkpoint_epoch_{epoch}.pth")
            print(f"Saved checkpoint at epoch {epoch}")

    # Save final models
    torch.save(G.state_dict(), f"{output_dir}/G.pth")
    torch.save(F.state_dict(), f"{output_dir}/F.pth")





**Defining Image Generation Function**

In [7]:
def generate_submission(photo_dir, submission_dir, model_path):
    transform = transforms.Compose([
        transforms.Resize((286, 286)),
        transforms.RandomCrop((256, 256)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
    ])

    G = Generator().to(DEVICE)
    checkpoint = torch.load(model_path)
    G.load_state_dict(checkpoint['G_state_dict'])
    G.eval()

    os.makedirs(submission_dir, exist_ok=True)
    photos = sorted(os.listdir(photo_dir))

    with torch.no_grad():
     for i, photo_name in enumerate(photos[:1500]):  # Limit to first 1500 images
         photo_path = os.path.join(photo_dir, photo_name)
         photo = Image.open(photo_path).convert('RGB')
         photo = transform(photo).unsqueeze(0).to(DEVICE)

         fake_monet = G(photo)
         fake_monet = (fake_monet * 0.5 + 0.5).clamp(0, 1)  # Denormalize
         img = transforms.ToPILImage()(fake_monet.squeeze(0).cpu()).convert('RGB')
         img.save(f"{submission_dir}/image_{i:04d}.jpg", 'JPEG', quality=95)




**Model Training**

In [8]:
if __name__ == "__main__":
    PHOTO_DIR = "photo"
    MONET_DIR = "monet"
    OUTPUT_DIR = "Final_models"  # For final model
    MODELS_DIR = "models"           # For model checkpoints
    SUBMISSION_DIR = "submission"   # For final submission images

    train_cycle_gan(PHOTO_DIR, MONET_DIR, OUTPUT_DIR, MODELS_DIR)
    generate_submission(PHOTO_DIR, SUBMISSION_DIR, f"{MODELS_DIR}/checkpoint_epoch_{NUM_EPOCHS-1}.pth")




Epoch [0/200] Batch [0/2346]
Discriminator Losses:
  D_X Real: 2.5162, D_X Fake: 1.4217, D_X Total: 1.9689
  D_Y Real: 3.7201, D_Y Fake: 1.7277, D_Y Total: 2.7239
Generator Losses:
  G Adv: 3.6381, F Adv: 2.7290
  Cycle Photo: 0.5885, Cycle Monet: 0.4899
  Perceptual Photo: 0.8593, Perceptual Monet: 0.7638
  Total G Loss: 25.2676
Epoch [0/200] Batch [100/2346]
Discriminator Losses:
  D_X Real: 0.2482, D_X Fake: 0.2374, D_X Total: 0.2428
  D_Y Real: 0.3509, D_Y Fake: 0.1672, D_Y Total: 0.2590
Generator Losses:
  G Adv: 0.4380, F Adv: 0.3347
  Cycle Photo: 0.2553, Cycle Monet: 0.2322
  Perceptual Photo: 0.6317, Perceptual Monet: 0.4966
  Total G Loss: 11.2887
Epoch [0/200] Batch [200/2346]
Discriminator Losses:
  D_X Real: 0.1693, D_X Fake: 0.2272, D_X Total: 0.1982
  D_Y Real: 0.2334, D_Y Fake: 0.2788, D_Y Total: 0.2561
Generator Losses:
  G Adv: 0.3360, F Adv: 0.3305
  Cycle Photo: 0.3167, Cycle Monet: 0.2567
  Perceptual Photo: 0.5626, Perceptual Monet: 0.6078
  Total G Loss: 12.2527
