In [1]:
# connect with google drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torchvision
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F

In [None]:
# Discriminator Class

def discriminator_conv(in_channel, out_channel, kernel_size=3, stride=2, padding=1, batch_norm=True, bias=False):
    layers = [nn.Conv2d(in_channel, out_channel, kernel_size, stride=stride, padding=padding, bias=bias)]

    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channel))

    return nn.Sequential(*layers)


class Discriminator(nn.Module):
    def __init__(self, number_of_channel, discriminator_dim):
        super(Discriminator, self).__init__()

        self.disc_conv1 = discriminator_conv(number_of_channel, discriminator_dim, kernel_size=4, stride=2, padding=1, batch_norm=False)
        self.disc_conv2 = discriminator_conv(discriminator_dim, discriminator_dim * 2, kernel_size=4, stride=2, padding=1, batch_norm=True)
        self.disc_conv3 = discriminator_conv(discriminator_dim * 2, discriminator_dim * 4, kernel_size=4, stride=2, padding=1, batch_norm=True)
        self.disc_conv4 = discriminator_conv(discriminator_dim * 4, discriminator_dim * 8, kernel_size=4, stride=2, padding=1, batch_norm=True)

        self.disc_conv5 = discriminator_conv(discriminator_dim * 8, 1, kernel_size=4, stride=1, padding=0, batch_norm=False)


    def forward(self, mzg):
        out = F.leaky_relu(self.disc_conv1(mzg), inplace=True, negative_slope=0.2)
        out = F.leaky_relu(self.disc_conv2(out), inplace=True, negative_slope=0.2)
        out = F.leaky_relu(self.disc_conv3(out), inplace=True, negative_slope=0.2)
        out = F.leaky_relu(self.disc_conv4(out), inplace=True, negative_slope=0.2)

        out = torch.sigmoid(self.disc_conv5(out))

        return out

In [None]:
# Generator Class

def generator_conv(in_channel, out_channel, kernel_size=3, stride=2,padding=1, batch_norm=True, bias=False):
    layers = [nn.ConvTranspose2d(in_channel, out_channel, kernel_size=kernel_size, stride=stride, padding=padding, bias=bias)]

    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channel))

    return nn.Sequential(*layers)


class Generator(nn.Module):
    def __init__(self, num_of_z, number_of_channel, generator_dim):
        super(Generator, self).__init__()

        self.gen_conv1 = generator_conv(num_of_z, generator_dim * 8, kernel_size=4, stride=1, padding=0, batch_norm=True)
        self.gen_conv2 = generator_conv(generator_dim * 8, generator_dim * 4, kernel_size=4, stride=2 , padding=1, batch_norm=True)
        self.gen_conv3 = generator_conv(generator_dim * 4, generator_dim * 2, kernel_size=4, stride=2, padding=1, batch_norm=True)
        self.gen_conv4 = generator_conv(generator_dim * 2, generator_dim, kernel_size=4, stride=2, padding=1, batch_norm=True)

        self.gen_conv5 = generator_conv(generator_dim, number_of_channel, kernel_size=4, stride=2, padding=1, batch_norm=False)


    def forward(self, mzg):
        out = F.relu(self.gen_conv1(mzg))
        out = F.relu(self.gen_conv2(out))
        out = F.relu(self.gen_conv3(out))
        out = F.relu((self.gen_conv4(out)))

        out = torch.tanh(self.gen_conv5(out))

        return out

In [None]:
# custom weights initialization called on netG and netD

def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [None]:
"""
From DCGAN Paper
"""


# Number of workers for dataloader
workers = 8

# Batch size during training
batch_size = 128

# Spatial size of training images. All images will be resized to this
# size using a transformer.
image_size = 128

# Number of channels in the training images. For color images this is 3
nc = 3  

# Size of z latent vector (i.e. size of generator input)
nz = 100

# Size of feature maps in generator
ngf = 64

# Size of feature maps in discriminator
ndf = 64

# Number of training epochs
num_epochs = 100

# Learning rate for optimizers
lr = 0.0002

# Beta1 hyperparam for Adam optimizers
beta1 = 0.5

# Number of GPUs available. Use 0 for CPU mode.
ngpu = 0

In [None]:
"""
You must change CustomData class according to your dataset 

If you have a dataset where you can follow the classic data loading operations of Pytorch, you can use it.
There are resources on how to load datasets that have classes in other resources

"""


from PIL import Image
import os
from torch.utils.data import Dataset
import numpy as np


class CustomData(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform

        self.images = os.listdir(data_dir)

        self.images_len = len(self.images)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, item):
        img = self.images[item % self.images_len]

        img_path = os.path.join(self.data_dir, img)

        img = Image.open(img_path).convert("RGB")

        img = self.transform(img)

        return img


In [None]:
transform = transforms.Compose([
    transforms.Resize((image_size, image_size)),
    transforms.ToTensor(),
    transforms.Normalize(
        [0.5 for _ in range(nc)], [0.5 for _ in range(nc)]
    ),
])

# Dataset
dataset = CustomData('/content/drive/images/', transform = transform)

dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

In [None]:
dataset.__len__()

986

In [None]:
gen = Generator(nz, nc, ngf)
disc = Discriminator(nc, ndf)

weights_init(gen)
weights_init(disc)

In [None]:
gen

Generator(
  (gen_conv1): Sequential(
    (0): ConvTranspose2d(100, 512, kernel_size=(4, 4), stride=(1, 1), bias=False)
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (gen_conv2): Sequential(
    (0): ConvTranspose2d(512, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (gen_conv3): Sequential(
    (0): ConvTranspose2d(256, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (gen_conv4): Sequential(
    (0): ConvTranspose2d(128, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (gen_conv5): Sequential(
    (0): ConvTranspose2d(64, 3, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
  )
)

In [None]:
disc

Discriminator(
  (disc_conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
  )
  (disc_conv2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (disc_conv3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (disc_conv4): Sequential(
    (0): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (disc_conv5): Sequential(
    (0): Conv2d(512, 1, kernel_size=(4, 4), stride=(1, 1), bias=False)
  )
)

In [None]:
opt_gen = optim.Adam(gen.parameters(), lr=lr, betas=(0.5, 0.999))
opt_disc = optim.Adam(disc.parameters(), lr=lr, betas=(0.5, 0.999))
criterion = nn.BCELoss()

fixed_noise = torch.randn(32, nz, 1, 1)

writer_real = SummaryWriter(f"logs/real")
writer_fake = SummaryWriter(f"logs/fake")

step = 0

In [None]:
gen.train()
disc.train()

Discriminator(
  (disc_conv1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
  )
  (disc_conv2): Sequential(
    (0): Conv2d(64, 128, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (disc_conv3): Sequential(
    (0): Conv2d(128, 256, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (disc_conv4): Sequential(
    (0): Conv2d(256, 512, kernel_size=(4, 4), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (disc_conv5): Sequential(
    (0): Conv2d(512, 1, kernel_size=(4, 4), stride=(1, 1), bias=False)
  )
)

In [None]:
for epoch in range(num_epochs):
    # Target labels not needed! <3 unsupervised
    for batch_idx, real in enumerate(dataloader):
        noise = torch.randn(batch_size, nz, 1, 1)
        fake = gen(noise)

        ### Train Discriminator: max log(D(x)) + log(1 - D(G(z)))
        disc_real = disc(real).reshape(-1)
        loss_disc_real = criterion(disc_real, torch.ones_like(disc_real))
        disc_fake = disc(fake.detach()).reshape(-1)
        loss_disc_fake = criterion(disc_fake, torch.zeros_like(disc_fake))
        loss_disc = (loss_disc_real + loss_disc_fake) / 2
        disc.zero_grad()
        loss_disc.backward()
        opt_disc.step()

        ### Train Generator: min log(1 - D(G(z))) <-> max log(D(G(z))
        output = disc(fake).reshape(-1)
        loss_gen = criterion(output, torch.ones_like(output))
        gen.zero_grad()
        loss_gen.backward()
        opt_gen.step()

        # Print losses occasionally and print to tensorboard
        if batch_idx % 10 == 0:
            print(
                f"Epoch [{epoch}/{num_epochs}] Batch {batch_idx}/{len(dataloader)} \
                  Loss D: {loss_disc:.4f}, loss G: {loss_gen:.4f}"
            )

            with torch.no_grad():
                fake = gen(fixed_noise)
                # take out (up to) 32 examples
                img_grid_real = torchvision.utils.make_grid(
                    real[:32], normalize=True
                )
                img_grid_fake = torchvision.utils.make_grid(
                    fake[:32], normalize=True
                )

                writer_real.add_image("Real", img_grid_real, global_step=step)
                writer_fake.add_image("Fake", img_grid_fake, global_step=step)

            step += 1

Epoch [0/100] Batch 0/8                   Loss D: 0.7388, loss G: 4.2369
Epoch [1/100] Batch 0/8                   Loss D: 0.1205, loss G: 6.1221
Epoch [2/100] Batch 0/8                   Loss D: 0.0495, loss G: 7.1965
Epoch [3/100] Batch 0/8                   Loss D: 0.0285, loss G: 7.8416
Epoch [4/100] Batch 0/8                   Loss D: 0.0176, loss G: 7.7707
Epoch [5/100] Batch 0/8                   Loss D: 0.0121, loss G: 8.1196
Epoch [6/100] Batch 0/8                   Loss D: 0.0112, loss G: 7.7669
Epoch [7/100] Batch 0/8                   Loss D: 0.0087, loss G: 7.9247
Epoch [8/100] Batch 0/8                   Loss D: 0.0081, loss G: 7.8798
Epoch [9/100] Batch 0/8                   Loss D: 0.0065, loss G: 7.5958


In [None]:
# display results with using tensorboard
%load_ext tensorboard
%tensorboard --logdir /content/logs/real

In [None]:
# display results with using tensorboard
%load_ext tensorboard
%tensorboard --logdir /content/logs/fake

In [None]:
# Specify a path to save to
PATH = "model_DCGAN.pt"

torch.save({
            'modelG_state_dict': gen.state_dict(),
            'modelD_state_dict': disc.state_dict(),
            'optimizerG_state_dict': opt_gen.state_dict(),
            'optimizerD_state_dict': opt_disc.state_dict(),
            }, PATH)