In [30]:
import os
import numpy as np
import torch
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
from PIL import Image
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

In [None]:
# Upload kaggle.json
from google.colab import files
files.upload()

# Move kaggle.json to the appropriate directory
!mkdir -p ~/.kaggle/
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle (2).json


In [31]:
!ls ~/.kaggle/

kaggle.json


In [32]:
!kaggle datasets list

ref                                                          title                                          size  lastUpdated          downloadCount  voteCount  usabilityRating  
-----------------------------------------------------------  --------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
bhadramohit/customer-shopping-latest-trends-dataset          Customer Shopping (Latest Trends) Dataset      76KB  2024-11-23 15:26:12           3978         76  1.0              
hopesb/student-depression-dataset                            Student Depression Dataset.                   454KB  2024-11-22 17:56:03           1741         30  0.9411765        
ikynahidwin/depression-student-dataset                       Depression Student Dataset                      4KB  2024-11-20 06:42:01           3516         72  1.0              
steve1215rogg/student-lifestyle-dataset                      student lifestyle dataset                   

In [36]:
!mkdir -p ./data/van-gogh-paintings
!mkdir -p ./data/monet

# Download Van Gogh dataset
!kaggle datasets download -d ipythonx/van-gogh-paintings
!unzip van-gogh-paintings.zip -d ./data/van-gogh-paintings

# Download Monet dataset
!kaggle datasets download -d balraj98/monet2photo
!unzip monet2photo.zip -d ./data/monet

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: ./data/monet/trainB/2015-02-12 11_31_32.jpg  
  inflating: ./data/monet/trainB/2015-02-12 12_30_32.jpg  
  inflating: ./data/monet/trainB/2015-02-12 19_51_27.jpg  
  inflating: ./data/monet/trainB/2015-02-12 20_53_44.jpg  
  inflating: ./data/monet/trainB/2015-02-12 21_31_48.jpg  
  inflating: ./data/monet/trainB/2015-02-13 09_22_26.jpg  
  inflating: ./data/monet/trainB/2015-02-13 10_07_16.jpg  
  inflating: ./data/monet/trainB/2015-02-13 13_57_11.jpg  
  inflating: ./data/monet/trainB/2015-02-13 16_02_50.jpg  
  inflating: ./data/monet/trainB/2015-02-13 16_45_49.jpg  
  inflating: ./data/monet/trainB/2015-02-14 00_09_27.jpg  
  inflating: ./data/monet/trainB/2015-02-14 02_21_28.jpg  
  inflating: ./data/monet/trainB/2015-02-14 02_26_30.jpg  
  inflating: ./data/monet/trainB/2015-02-14 10_38_14.jpg  
  inflating: ./data/monet/trainB/2015-02-14 12_37_48.jpg  
  inflating: ./data/monet/trainB/2015-02-14 17_22_

In [39]:
# Transformation for the dataset
data_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

# # CELEBA Dataset
# celeba_dataset = datasets.CelebA(root='./data/celeba', split='train', transform=data_transform, download=True)
# celeba_loader = DataLoader(celeba_dataset, batch_size=64, shuffle=True)

# CIFAR-10 Dataset
cifar10_dataset = datasets.CIFAR10(root='./data/cifar10', train=True, transform=data_transform, download=True)
cifar10_loader = DataLoader(cifar10_dataset, batch_size=64, shuffle=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar10/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:05<00:00, 29.6MB/s]


Extracting ./data/cifar10/cifar-10-python.tar.gz to ./data/cifar10


In [None]:
files.upload()

In [None]:
# Monet Dataset
monet_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

monet_dataset = datasets.ImageFolder(root='./data/monet', transform=monet_transform)
monet_loader = DataLoader(monet_dataset, batch_size=64, shuffle=True)

# Van Gogh Dataset
vangogh_dataset = datasets.ImageFolder(root='./data/vangogh', transform=monet_transform)
vangogh_loader = DataLoader(vangogh_dataset, batch_size=64, shuffle=True)

In [None]:
def show_images(loader):
    data_iter = iter(loader)
    images, _ = data_iter.next()
    images = images * 0.5 + 0.5  # Denormalize
    grid = torchvision.utils.make_grid(images[:16], nrow=4)
    plt.figure(figsize=(8,8))
    plt.imshow(grid.permute(1, 2, 0))
    plt.axis('off')
    plt.show()

print("Sample CELEBA Images:")
show_images(celeba_loader)

print("Sample Monet Images:")
show_images(monet_loader)

###**Implement Progressive CycleGAN**
####We will implement the Progressive CycleGAN by integrating the concepts of CycleGAN and Progressive Growing GAN.
###**Generator Architecture**
####The generator consists of:
- Downsampling layers: To capture low-resolution features.
- Residual blocks: For feature transformation.
- Upsampling layers: To reconstruct high-resolution images.
- Progressive Layers: New layers added as resolution increases.

In [None]:
import torch.nn as nn

class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.InstanceNorm2d(channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(channels, channels, kernel_size=3, padding=1),
            nn.InstanceNorm2d(channels)
        )

    def forward(self, x):
        return x + self.block(x)

In [None]:
class Generator(nn.Module):
    def __init__(self, input_nc, output_nc, ngf=64, n_blocks=6, resolution=64):
        super(Generator, self).__init__()
        self.init_resolution = 4  # Start from 4x4 images
        self.ngf = ngf
        self.n_blocks = n_blocks
        self.resolution = resolution

        # Initial convolutional block
        self.init_block = nn.Sequential(
            nn.Conv2d(input_nc, ngf, kernel_size=7, padding=3),
            nn.InstanceNorm2d(ngf),
            nn.ReLU(inplace=True)
        )

        # Downsampling layers
        self.down_blocks = nn.ModuleList()
        res = self.init_resolution
        while res < resolution:
            self.down_blocks.append(
                nn.Sequential(
                    nn.Conv2d(ngf, ngf * 2, kernel_size=3, stride=2, padding=1),
                    nn.InstanceNorm2d(ngf * 2),
                    nn.ReLU(inplace=True)
                )
            )
            ngf *= 2
            res *= 2

        # Residual blocks
        self.res_blocks = nn.Sequential(*[ResidualBlock(ngf) for _ in range(n_blocks)])

        # Upsampling layers
        self.up_blocks = nn.ModuleList()
        while ngf > 64:
            self.up_blocks.append(
                nn.Sequential(
                    nn.ConvTranspose2d(ngf, ngf // 2, kernel_size=3, stride=2, padding=1, output_padding=1),
                    nn.InstanceNorm2d(ngf // 2),
                    nn.ReLU(inplace=True)
                )
            )
            ngf = ngf // 2

        # Output layer
        self.output_block = nn.Sequential(
            nn.Conv2d(ngf, output_nc, kernel_size=7, padding=3),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.init_block(x)
        for down in self.down_blocks:
            x = down(x)
        x = self.res_blocks(x)
        for up in self.up_blocks:
            x = up(x)
        x = self.output_block(x)
        return x

###**Discriminator Architecture**
####The discriminator uses PatchGAN and adapts progressively.

In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_nc, ndf=64, resolution=64):
        super(Discriminator, self).__init__()
        self.init_resolution = 4
        self.ndf = ndf
        self.resolution = resolution

        layers = [
            nn.Conv2d(input_nc, ndf, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True)
        ]

        res = self.init_resolution
        while res < resolution:
            layers += [
                nn.Conv2d(ndf, ndf * 2, kernel_size=4, stride=2, padding=1),
                nn.InstanceNorm2d(ndf * 2),
                nn.LeakyReLU(0.2, inplace=True)
            ]
            ndf *= 2
            res *= 2

        layers += [
            nn.Conv2d(ndf, 1, kernel_size=4, padding=1)
        ]

        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

###**Progressive Growing Mechanism**
####We'll define functions to progressively grow the networks.

In [None]:
def grow_generator(generator, new_resolution):
    """
    Add layers to the generator to handle new resolution.

    Parameters:
        generator (nn.Module): The current generator model.
        new_resolution (int): The new resolution for the image (e.g., 8, 16, 32, etc.).

    Returns:
        generator (nn.Module): The updated generator with added layers for the new resolution.
    """
    # Assuming the generator uses transposed convolution layers to upsample
    # Start by defining the new layers to add based on the resolution

    if new_resolution == 8:
        # Add a new transposed convolution layer to upscale from 4x4 to 8x8
        generator.add_module('upconv1', nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1, bias=False))
        generator.add_module('batchnorm1', nn.BatchNorm2d(128))
        generator.add_module('relu1', nn.ReLU(inplace=True))

    elif new_resolution == 16:
        # Add a new transposed convolution layer to upscale from 8x8 to 16x16
        generator.add_module('upconv2', nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1, bias=False))
        generator.add_module('batchnorm2', nn.BatchNorm2d(64))
        generator.add_module('relu2', nn.ReLU(inplace=True))

    elif new_resolution == 32:
        # Add a new transposed convolution layer to upscale from 16x16 to 32x32
        generator.add_module('upconv3', nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1, bias=False))
        generator.add_module('batchnorm3', nn.BatchNorm2d(32))
        generator.add_module('relu3', nn.ReLU(inplace=True))

    elif new_resolution == 64:
        # Add a new transposed convolution layer to upscale from 32x32 to 64x64
        generator.add_module('upconv4', nn.ConvTranspose2d(32, 16, kernel_size=4, stride=2, padding=1, bias=False))
        generator.add_module('batchnorm4', nn.BatchNorm2d(16))
        generator.add_module('relu4', nn.ReLU(inplace=True))

    # Add the final output layer
    generator.add_module('output', nn.Conv2d(16, 3, kernel_size=3, stride=1, padding=1, bias=False))  # 3 channels for RGB images
    generator.add_module('tanh', nn.Tanh())  # Tanh activation to scale outputs to [-1, 1]

    return generator

def grow_discriminator(discriminator, new_resolution):
    """
    Add layers to the discriminator to handle new resolution.

    Parameters:
        discriminator (nn.Module): The current discriminator model.
        new_resolution (int): The new resolution for the image (e.g., 8, 16, 32, etc.).

    Returns:
        discriminator (nn.Module): The updated discriminator with added layers for the new resolution.
    """
    # Assuming the discriminator uses convolution layers to downsample
    if new_resolution == 8:
        # Add a new convolutional layer to handle the higher resolution input
        discriminator.add_module('conv1', nn.Conv2d(3, 64, kernel_size=4, stride=2, padding=1, bias=False))
        discriminator.add_module('leakyrelu1', nn.LeakyReLU(0.2, inplace=True))

    elif new_resolution == 16:
        # Add a new convolutional layer to handle 16x16 images
        discriminator.add_module('conv2', nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1, bias=False))
        discriminator.add_module('batchnorm1', nn.BatchNorm2d(128))
        discriminator.add_module('leakyrelu2', nn.LeakyReLU(0.2, inplace=True))

    elif new_resolution == 32:
        # Add a new convolutional layer to handle 32x32 images
        discriminator.add_module('conv3', nn.Conv2d(128, 256, kernel_size=4, stride=2, padding=1, bias=False))
        discriminator.add_module('batchnorm2', nn.BatchNorm2d(256))
        discriminator.add_module('leakyrelu3', nn.LeakyReLU(0.2, inplace=True))

    elif new_resolution == 64:
        # Add a new convolutional layer to handle 64x64 images
        discriminator.add_module('conv4', nn.Conv2d(256, 512, kernel_size=4, stride=2, padding=1, bias=False))
        discriminator.add_module('batchnorm3', nn.BatchNorm2d(512))
        discriminator.add_module('leakyrelu4', nn.LeakyReLU(0.2, inplace=True))

    # Add the final output layer
    discriminator.add_module('output', nn.Conv2d(512, 1, kernel_size=4, stride=1, padding=1, bias=False))  # Output a single score for real/fake
    discriminator.add_module('sigmoid', nn.Sigmoid())  # Sigmoid activation to get values between [0, 1]

    return discriminator

###**CycleGAN Loss Functions**

In [None]:
# Adversarial loss
adversarial_loss = nn.MSELoss()

# Cycle consistency loss
cycle_loss = nn.L1Loss()

# Identity loss
identity_loss = nn.L1Loss()

###**Training**
####We'll train the model progressively, increasing the image resolution at predefined stages.

In [None]:
import itertools

def train(cycle_gan, data_loaders, epochs_per_stage, resolutions):
    G_AB, G_BA, D_A, D_B = cycle_gan
    optimizer_G = torch.optim.Adam(itertools.chain(G_AB.parameters(), G_BA.parameters()), lr=0.0002, betas=(0.5, 0.999))
    optimizer_D = torch.optim.Adam(itertools.chain(D_A.parameters(), D_B.parameters()), lr=0.0002, betas=(0.5, 0.999))

    for res in resolutions:
        print(f"Training at resolution: {res}x{res}")
        # Grow networks to new resolution
        grow_generator(G_AB, res)
        grow_generator(G_BA, res)
        grow_discriminator(D_A, res)
        grow_discriminator(D_B, res)

        for epoch in range(epochs_per_stage):
            for i, ((real_A, _), (real_B, _)) in enumerate(zip(data_loaders['A'], data_loaders['B'])):
                # Training steps for generators and discriminators
                pass  # The detailed training loop implementation

###**Initialize Models and Data Loaders**

In [None]:
input_nc = 3
output_nc = 3
initial_resolution = 4
final_resolution = 64
resolutions = [8, 16, 32, 64]
epochs_per_stage = 5

G_AB = Generator(input_nc, output_nc, resolution=initial_resolution)
G_BA = Generator(output_nc, input_nc, resolution=initial_resolution)
D_A = Discriminator(input_nc, resolution=initial_resolution)
D_B = Discriminator(output_nc, resolution=initial_resolution)

cycle_gan = (G_AB, G_BA, D_A, D_B)

data_loaders = {
    'A': DataLoader(celeba_dataset, batch_size=64, shuffle=True),
    'B': DataLoader(monet_dataset, batch_size=64, shuffle=True)
}

In [None]:
train(cycle_gan, data_loaders, epochs_per_stage, resolutions)

###**Evaluation**
####We'll use Fréchet Inception Distance (FID) and Inception Score to evaluate the model.

In [None]:
from fid_score import calculate_fid

def compute_fid(model, data_loader, device):
    model.eval()
    real_images = []
    fake_images = []
    with torch.no_grad():
        for real_A, _ in data_loader['A']:
            real_A = real_A.to(device)
            fake_B = model(real_A)
            real_images.append(real_A.cpu())
            fake_images.append(fake_B.cpu())
    fid_value = calculate_fid(torch.cat(real_images), torch.cat(fake_images))
    return fid_value

In [None]:
from torchvision.models.inception import inception_v3

def compute_inception_score(images):
    # Implementation of Inception Score computation
    pass  # Placeholder for the actual code

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
G_AB.to(device)

fid_value = compute_fid(G_AB, data_loaders, device)
print(f"FID Score: {fid_value}")

###**Analysis**
- Translation Quality: Visually inspect the translated images at each resolution stage.
- Stability: Observe training losses and convergence behavior.
- Comparison with Standard CycleGAN: Train a standard CycleGAN model and compare the results.

In [None]:
def translate_and_show(model, data_loader, device):
    model.eval()
    data_iter = iter(data_loader)
    real_images, _ = next(data_iter)
    real_images = real_images.to(device)
    with torch.no_grad():
        fake_images = model(real_images)

    # Denormalize images
    real_images = real_images * 0.5 + 0.5
    fake_images = fake_images * 0.5 + 0.5

    # Concatenate real and fake images
    images = torch.cat((real_images.cpu(), fake_images.cpu()), 0)

    # Create a grid of images
    grid = torchvision.utils.make_grid(images, nrow=8)

    # Display the images
    plt.figure(figsize=(16, 8))
    plt.imshow(grid.permute(1, 2, 0))
    plt.axis('off')
    plt.show()

# Usage Example
translate_and_show(G_AB, data_loaders['A'], device)

###**Compare with Standard CycleGAN**
- Train a standard CycleGAN without progressive growing.
- Evaluate using the same metrics.
- Compare FID and Inception Scores.

First, we need to train a standard CycleGAN model. To do so, we will use the CycleGAN implementation available in the original repository or write our own custom model. Here's a simplified version of the CycleGAN training loop without progressive growing.

In [None]:
# Define the CycleGAN architecture without progressive growing

# Generator and Discriminator models for standard CycleGAN
class Generator(nn.Module):
    # Define a simple Generator model
    pass  # Implement this model here

class Discriminator(nn.Module):
    # Define a simple Discriminator model
    pass  # Implement this model here

# Initialize models and optimizers
generator_A_to_B = Generator().to(device)
generator_B_to_A = Generator().to(device)
discriminator_A = Discriminator().to(device)
discriminator_B = Discriminator().to(device)

optimizer_G = optim.Adam(list(generator_A_to_B.parameters()) + list(generator_B_to_A.parameters()), lr=0.0002, betas=(0.5, 0.999))
optimizer_D = optim.Adam(list(discriminator_A.parameters()) + list(discriminator_B.parameters()), lr=0.0002, betas=(0.5, 0.999))

# Define the loss functions
criterion_GAN = nn.BCELoss()
criterion_cycle = nn.L1Loss()

# Training loop for standard CycleGAN
def train_standard_cycle_gan(data_loader_A, data_loader_B, num_epochs=100):
    for epoch in range(num_epochs):
        for i, (real_A, real_B) in enumerate(zip(data_loader_A, data_loader_B)):
            real_A = real_A.to(device)
            real_B = real_B.to(device)

            # Train Discriminator A
            optimizer_D.zero_grad()
            fake_B = generator_A_to_B(real_A).detach()
            loss_D_A = criterion_GAN(discriminator_A(real_A), torch.ones_like(real_A)) + criterion_GAN(discriminator_A(fake_B), torch.zeros_like(fake_B))

            # Train Discriminator B
            fake_A = generator_B_to_A(real_B).detach()
            loss_D_B = criterion_GAN(discriminator_B(real_B), torch.ones_like(real_B)) + criterion_GAN(discriminator_B(fake_A), torch.zeros_like(fake_A))

            loss_D = (loss_D_A + loss_D_B) * 0.5
            loss_D.backward()
            optimizer_D.step()

            # Train Generators
            optimizer_G.zero_grad()

            fake_B = generator_A_to_B(real_A)
            loss_G_A = criterion_GAN(discriminator_A(fake_B), torch.ones_like(fake_B)) + criterion_cycle(fake_A, real_A) * 10

            fake_A = generator_B_to_A(real_B)
            loss_G_B = criterion_GAN(discriminator_B(fake_A), torch.ones_like(fake_A)) + criterion_cycle(fake_B, real_B) * 10

            loss_G = (loss_G_A + loss_G_B) * 0.5
            loss_G.backward()
            optimizer_G.step()

        print(f'Epoch [{epoch}/{num_epochs}], Loss D: {loss_D.item()}, Loss G: {loss_G.item()}')

# Example of how to train the model:
train_standard_cycle_gan(data_loaders['A'], data_loaders['B'])

We can use the Fréchet Inception Distance (FID) and Inception Score (IS) to evaluate the quality of the generated images.

In [None]:
import torch
import numpy as np
import torchvision.models as models
from scipy.linalg import sqrtm
from sklearn.metrics import pairwise_distances_argmin_min
from torchvision import transforms
from torchvision.datasets import CIFAR10

def calculate_fid(real_images, fake_images):
    """Calculate the Fréchet Inception Distance (FID)."""
    # Use InceptionV3 model to extract features
    inception_model = models.inception_v3(pretrained=True, transform_input=False).to(device)
    inception_model.eval()

    # Normalize images
    real_images = (real_images * 0.5) + 0.5  # Denormalize to [0, 1]
    fake_images = (fake_images * 0.5) + 0.5

    # Pass through InceptionV3 model and extract features
    with torch.no_grad():
        real_features = inception_model(real_images)
        fake_features = inception_model(fake_images)

    # Compute the FID score
    mean_real = np.mean(real_features, axis=0)
    mean_fake = np.mean(fake_features, axis=0)
    cov_real = np.cov(real_features, rowvar=False)
    cov_fake = np.cov(fake_features, rowvar=False)

    mean_diff = mean_real - mean_fake
    cov_sqrt = sqrtm(cov_real.dot(cov_fake))

    fid = np.sum(mean_diff**2) + np.trace(cov_real + cov_fake - 2 * cov_sqrt)
    return fid

def calculate_inception_score(fake_images):
    """Calculate the Inception Score (IS)."""
    # Use InceptionV3 model for IS
    inception_model = models.inception_v3(pretrained=True, transform_input=False).to(device)
    inception_model.eval()

    # Normalize and pass through InceptionV3
    fake_images = (fake_images * 0.5) + 0.5  # Denormalize to [0, 1]
    with torch.no_grad():
        predictions = inception_model(fake_images)

    # Calculate Inception Score
    p_y = np.mean(predictions, axis=0)
    kl_divergence = np.sum(p_y * np.log(p_y / np.mean(predictions, axis=0)), axis=1)
    is_score = np.exp(np.mean(kl_divergence))
    return is_score

# Example of evaluating FID and IS for the standard CycleGAN model:
real_images = torch.randn(64, 3, 256, 256).to(device)  # Example: 64 real images
fake_images = torch.randn(64, 3, 256, 256).to(device)  # Example: 64 fake images

fid_score = calculate_fid(real_images, fake_images)
inception_score = calculate_inception_score(fake_images)

print(f'FID Score: {fid_score}')
print(f'Inception Score: {inception_score}')

After training the Progressive CycleGAN and the Standard CycleGAN, you can use the following code to compare their FID and Inception scores.

In [None]:
# Assume we have FID and IS scores for both models
progressive_fid = 50.4  # Example FID score for Progressive CycleGAN
standard_fid = 60.2  # Example FID score for Standard CycleGAN

progressive_is = 8.2  # Example Inception Score for Progressive CycleGAN
standard_is = 7.5  # Example Inception Score for Standard CycleGAN

# Print out the comparison
print(f"Progressive CycleGAN FID Score: {progressive_fid}")
print(f"Standard CycleGAN FID Score: {standard_fid}")
print(f"Progressive CycleGAN Inception Score: {progressive_is}")
print(f"Standard CycleGAN Inception Score: {standard_is}")