# Loading the data

https://www.kaggle.com/datasets/jhoward/lsun_bedroom/data

@misc{yu2016lsun,
      title={LSUN: Construction of a Large-scale Image Dataset using Deep Learning with Humans in the Loop}, 
      author={Fisher Yu and Ari Seff and Yinda Zhang and Shuran Song and Thomas Funkhouser and Jianxiong Xiao},
      year={2016},
      eprint={1506.03365},
      archivePrefix={arXiv},
      primaryClass={cs.CV}
}

In [1]:
import os
import time

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision
from torchvision import models
from torchvision.transforms import v2
from torchvision.datasets import ImageFolder

from tqdm import tqdm

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

In [3]:
 data_path = './data/subset'
#data_path = './data/data0/lsun/bedroom'

In [4]:
# image size after transformations
image_size = 64

simple_load = v2.Compose([
    v2.Resize((image_size, image_size)),
    v2.PILToTensor(),
    v2.ToDtype(torch.float32),
    v2.Normalize([0.5], [0.5]),
])

# Training the models

### DDPM

https://huggi64ace.co/docs/diffusers/en/tutorials/basic_training

In [42]:
from diffusers import DDPMPipeline, DDPMScheduler, UNet2DModel
from diffusers.utils import make_image_grid
from datasets import load_dataset

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
dataset = load_dataset(data_path, split='train')
dataset.set_transform(simple_load)

Resolving data files:   0%|          | 0/303125 [00:00<?, ?it/s]

In [7]:
def get_unet():
    unet_model = UNet2DModel(
        sample_size=image_size,  # the target image resolution
        in_channels=3,  # the number of input channels, 3 for RGB images
        out_channels=3,  # the number of output channels
        layers_per_block=2,  # how many ResNet layers to use per UNet block
        block_out_channels=(128, 128, 256, 256, 512, 512),  # the number of output channels for each UNet block
        down_block_types=(
            "DownBlock2D",  # a regular ResNet downsampling block
            "DownBlock2D",
            "DownBlock2D",
            "DownBlock2D",
            "AttnDownBlock2D",  # a ResNet downsampling block with spatial self-attention
            "DownBlock2D",
        ),
        up_block_types=(
            "UpBlock2D",  # a regular ResNet upsampling block
            "AttnUpBlock2D",  # a ResNet upsampling block with spatial self-attention
            "UpBlock2D",
            "UpBlock2D",
            "UpBlock2D",
            "UpBlock2D",
        ),
    ).to(device)
    return unet_model

In [8]:
def evaluate(epoch, pipeline, save_name: str = 'ddpm_training', random_state: int | None = None):
    # Sample some images from random noise (this is the backward diffusion process).
    # The default pipeline output type is `List[PIL.Image]`
    images = pipeline(
        batch_size=16,
        generator=torch.manual_seed(random_state),
    ).images

    # Make a grid out of the images
    image_grid = make_image_grid(images, rows=4, cols=4)

    # Save the images
    test_dir = os.path.join('saved', save_name)
    os.makedirs(test_dir, exist_ok=True)
    image_grid.save(f"{test_dir}/{epoch:04d}.png")

In [9]:
def train_unet(model, n_epochs: int, noise_scheduler, optimizer, dataset, batch_size: int = 16, start_epoch: int = 0,
               save_every: int = 1, save_name: str = 'ddpm_training'):
    data_loader = DataLoader(dataset, batch_size)

    for i in range(start_epoch, n_epochs + start_epoch):
        epoch_no = i + 1
        print(f'Starting epoch {epoch_no}...')

        for batch in tqdm(data_loader):

            # move to cuda/cpu
            clean_images = batch["image"].to(device)

            # Sample noise to add to the images
            noise = torch.randn(clean_images.shape, device=device)
            bs = clean_images.shape[0]

            # Sample a random timestep for each image
            timesteps = torch.randint(
                0, noise_scheduler.config.num_train_timesteps, (bs,), device=device,
                dtype=torch.int64
            )

            # Add noise to the clean images according to the noise magnitude at each timestep
            # (this is the forward diffusion process)
            noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)

            # Predict the noise residual
            noise_pred = model(noisy_images, timesteps, return_dict=False)[0]
            loss = F.mse_loss(noise_pred, noise)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

        # evaluate every nth epoch
        if epoch_no % save_every == 0:
            print(f'Evaluating after epoch {epoch_no}...')
            # save model
            save_dir = f'saved/{save_name}'
            os.makedirs(save_dir, exist_ok=True)
            model.save_pretrained(f'{save_dir}/{epoch_no:04d}_model')
            # evaluate and save images
            pipeline = DDPMPipeline(unet=model, scheduler=noise_scheduler)
            evaluate(epoch_no, pipeline, save_name, random_state=epoch_no)
        
        # cooldown
        time.sleep(60 * 5)

In [10]:
noise_scheduler = DDPMScheduler(num_train_timesteps=1000)

Testing learning rate $10^{-3}$ 

In [2]:
unet_model = get_unet()
train_unet(
    unet_model, 
    n_epochs=5, 
    noise_scheduler=noise_scheduler, 
    optimizer=optim.Adam(unet_model.parameters()), # default lr = 0.001
    dataset=dataset, 
    batch_size=16,
    save_every=1,
    save_name='ddpm_training_lr1e-3',
)

Testing learning rate $10^{-4}$ 

In [11]:
unet_model = get_unet()
train_unet(
    unet_model, 
    n_epochs=20, 
    noise_scheduler=noise_scheduler, 
    optimizer=optim.Adam(unet_model.parameters(), lr=0.0001), 
    dataset=dataset, 
    batch_size=16,
    save_every=1,
    save_name='ddpm_training_lr1e-4',
)

Starting epoch 1...


  0%|          | 84/18946 [00:44<2:46:44,  1.89it/s]

Testing learning rate $10^{-5}$ 

In [None]:
unet_model = get_unet()
train_unet( 
    unet_model, 
    n_epochs=10, 
    noise_scheduler=noise_scheduler, 
    optimizer=optim.Adam(unet_model.parameters(), lr=1e-5), 
    dataset=dataset, 
    batch_size=16,
    save_every=1,
    save_name='ddpm_training_lr1e-5',
)

#### Testing regularisation

In [None]:
unet_model = get_unet()
train_unet(
    unet_model, 
    n_epochs=20, 
    noise_scheduler=noise_scheduler, 
    optimizer=optim.Adam(unet_model.parameters(), lr=1e-4, weight_decay=1e-3), 
    dataset=dataset, 
    batch_size=16,
    save_every=1,
    save_name='ddpm_training_reg1e-3',
)

In [None]:
unet_model = get_unet()
train_unet(
    unet_model, 
    n_epochs=20, 
    noise_scheduler=noise_scheduler, 
    optimizer=optim.Adam(unet_model.parameters(), lr=1e-4, weight_decay=1e-4), 
    dataset=dataset, 
    batch_size=16,
    save_every=1,
    save_name='ddpm_training_reg1e-4',
)

Starting epoch 1...


  0%|          | 84/18946 [00:44<2:46:44,  1.89it/s]

### GAN

In [28]:
from fastai.vision import *
from fastai.vision.gan import *
from fastai.vision.all import *
from fastai.losses import *
from fastai.metrics import *

In [29]:
def get_data(batch_size, path):
    class TorchVisionTransform(Transform):
        def __init__(self, tfms): self.tfms = tfms
        def encodes(self, img: PILImage): return self.tfms(img)
        
    dblock = DataBlock(
        blocks=(ImageBlock),
        get_items=get_image_files,
        splitter=FuncSplitter(lambda x: False),
        item_tfms=TorchVisionTransform(simple_load)
    )
    
    dls = dblock.dataloaders(path, bs=batch_size) 
    return dls

In [None]:
batch_size = 16
dls = get_data(batch_size, data_path)

In [31]:
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.network = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d(100, 64 * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(64 * 8),
            nn.ReLU(True),
            # state size. (64*8) x 4 x 4
            nn.ConvTranspose2d(64 * 8, 64 * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64 * 4),
            nn.ReLU(True),
            # state size. (64*4) x 8 x 8
            nn.ConvTranspose2d( 64 * 4, 64 * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64 * 2),
            nn.ReLU(True),
            # state size. (64*2) x 16 x 16
            nn.ConvTranspose2d( 64 * 2, 64, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            # state size. (64) x 32 x 32
            nn.ConvTranspose2d( 64, 3, 4, 2, 1, bias=False),
            nn.Tanh()
            # state size. (3) x 64 x 64
        )

    def forward(self, input):
        return self.network(input)


In [32]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.network = nn.Sequential(
            # input is (3) x 64 x 64
            nn.Conv2d(3, 64, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (64) x 32 x 32
            nn.Conv2d(64, 64 * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64 * 2),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (64*2) x 16 x 16
            nn.Conv2d(64 * 2, 64 * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64 * 4),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (64*4) x 8 x 8
            nn.Conv2d(64 * 4, 64 * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(64 * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # state size. (64*8) x 4 x 4
            nn.Conv2d(64 * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        return self.network(input)

In [44]:
def evaluate(epoch, generator, save_name: str = 'gan_training'):
    # Sample some images from random noise (this is the backward diffusion process).
    # The default pipeline output type is `List[PIL.Image]`
    noise = torch.randn(16, 100, 1, 1, device=device)

    batch = generator(noise)
    pil_images = []
    for tensor in batch:
        tensor = tensor.permute(1, 2, 0)
        tensor = (tensor * 0.5) + 0.5 #denormalize
        tensor = (tensor * 255).byte()
        pil_image = Image.fromarray(tensor.numpy())
        pil_images.append(pil_image)
    image_grid = make_image_grid(pil_images, rows=4, cols=4)

    # Save the images
    test_dir = os.path.join('saved', save_name)
    os.makedirs(test_dir, exist_ok=True)
    image_grid.save(f"{test_dir}/{epoch:04d}.png")

In [39]:
# Assuming you already have the necessary imports, data loading, and model definitions from the previous code

# Training function for vanilla GAN or DCGAN
def train_gan(generator, discriminator, dls, opt_gen, opt_disc, n_epochs, 
              start_epoch: int = 0, save_every: int = 1, save_name: str = 'gan_training'):
    generator.train()
    discriminator.train()
    
    criterion = nn.BCEWithLogitsLoss()
    
    for i in range(start_epoch, n_epochs + start_epoch):
        epoch_no = i + 1

        for real in tqdm(dls[0]):
            real = real[0]
            real = real.to(device)
            batch_size = real.size(0)
            
            # Train discriminator
            opt_disc.zero_grad()
            noise = torch.randn(batch_size, 100, 1, 1, device=device)

            fake = generator(noise)
            disc_real = discriminator(real)
            disc_fake = discriminator(fake.detach())
            loss_disc_real = criterion(disc_real, torch.ones_like(disc_real))
            loss_disc_fake = criterion(disc_fake, torch.zeros_like(disc_fake))
            loss_disc = (loss_disc_real + loss_disc_fake) / 2
            loss_disc.backward()
            opt_disc.step()
            
            # Train generator
            opt_gen.zero_grad()
            disc_fake = discriminator(fake)
            loss_gen = criterion(disc_fake, torch.ones_like(disc_fake))
            loss_gen.backward()
            opt_gen.step()
        
        print(f"Epoch {epoch_no} done! Loss discriminator: {loss_disc.item():.4f}, Loss generator: {loss_gen.item():.4f}")
        # evaluate every nth epoch
        if epoch_no % save_every == 0:
            print(f'Evaluating after epoch {epoch_no}...')
            # save model
            save_dir = f'saved/{save_name}'
            model_save_dir = f'{save_dir}/{epoch_no:04d}_model'
            os.makedirs(model_save_dir, exist_ok=True)

            generator_path = os.path.join(model_save_dir, 'generator.pth')
            discriminator_path = os.path.join(model_save_dir, 'discriminator.pth')
            torch.save(generator.state_dict(), generator_path)
            torch.save(discriminator.state_dict(), discriminator_path)
            
            evaluate(epoch_no, generator, save_name)


In [None]:
def load_gan(path):
    generator = Generator().to(device)
    generator.load_state_dict(torch.load(f"{path}/generator.pth", map_location=torch.device(device)))

    discriminator = Discriminator().to(device)
    discriminator.load_state_dict(torch.load(f"{path}/discriminator.pth", map_location=torch.device(device)))
    
    return generator, discriminator

In [46]:
model_generator = Generator().to(device)
model_discriminator = Discriminator().to(device)

opt_gen = optim.Adam(model_generator.parameters(), lr=0.0002, betas=(0.5, 0.999))
opt_disc = optim.Adam(model_discriminator.parameters(), lr=0.0002, betas=(0.5, 0.999))

train_gan(model_generator, model_discriminator, dls, opt_gen, opt_disc, n_epochs=3)

100%|██████████| 76/76 [00:11<00:00,  6.37it/s]


Epoch 1 done! Loss D: 0.5032, Loss G: 0.6931
Evaluating after epoch 1...


100%|██████████| 76/76 [00:11<00:00,  6.50it/s]


Epoch 2 done! Loss D: 0.5032, Loss G: 0.6931
Evaluating after epoch 2...


100%|██████████| 76/76 [00:12<00:00,  5.90it/s]


Epoch 3 done! Loss D: 0.5032, Loss G: 0.6931
Evaluating after epoch 3...


Testing learning rate $10^{-3}$ 

In [None]:
model_generator = Generator().to(device)
model_discriminator = Discriminator().to(device)

opt_gen = optim.Adam(model_generator.parameters(), lr=0.001)
opt_disc = optim.Adam(model_discriminator.parameters(), lr=0.001)

train_gan(model_generator, model_discriminator, dls, opt_gen, opt_disc, n_epochs=10, save_name = 'gan_training_lr1e-3')

Testing learning rate $10^{-4}$ 

In [None]:
model_generator = Generator().to(device)
model_discriminator = Discriminator().to(device)

opt_gen = optim.Adam(model_generator.parameters(), lr=1e-4)
opt_disc = optim.Adam(model_discriminator.parameters(), lr=1e-4)

train_gan(model_generator, model_discriminator, dls, opt_gen, opt_disc, n_epochs=10, save_name = 'gan_training_lr1e-4')

Testing learning rate $10^{-5}$ 

In [None]:
model_generator = Generator().to(device)
model_discriminator = Discriminator().to(device)

opt_gen = optim.Adam(model_generator.parameters(), lr=1e-5)
opt_disc = optim.Adam(model_discriminator.parameters(), lr=1e-5)

train_gan(model_generator, model_discriminator, dls, opt_gen, opt_disc, n_epochs=10, save_name = 'gan_training_lr1e-5')