In [None]:
from albumentations.augmentations.crops.transforms import CenterCrop
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules.activation import Tanh
from torch.nn.modules.batchnorm import BatchNorm2d
import torch.optim as optim
import albumentations as A
from albumentations.pytorch import ToTensorV2
from dataset import WaterBodyGeneratorDataset
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torchvision.utils as vutils
import random


#### GLOBALS
nz= 100
ngf = 256
ndf = 256
nc = 3
lr = 2e-4
beta1 = 0.5
num_epochs = 5


def weights_init(m):
    classname = m.__class__.__name__

    if classname.find("Conv") != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find("BatchNorm") != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

class GeneratorBlock(nn.Module):
    """Some Information about GeneratorBlock"""
    def __init__(self, features_in, features_out, kernel_size, stride, padding):
        super(GeneratorBlock, self).__init__()
        self.block = nn.Sequential(
            nn.ConvTranspose2d(features_in, features_out, kernel_size, stride, padding, bias=False),
            nn.BatchNorm2d(features_out),
            nn.ReLU(True)
        )
    def forward(self, x):
        x = self.block(x)
        return x

class Generator(nn.Module):
    """Some Information about Generator"""
    def __init__(self, latent_vector_size, channels_out):
        super(Generator, self).__init__()
        
        self.main = nn.ModuleList()
        self.main.append(GeneratorBlock(latent_vector_size, ngf * 32, kernel_size=4, stride=1, padding=0))
        self.main.append(GeneratorBlock(ngf * 32, ngf * 16, kernel_size=4, stride=2, padding=1))
        self.main.append(GeneratorBlock(ngf * 16, ngf * 8, kernel_size=4, stride=2, padding=1))
        self.main.append(GeneratorBlock(ngf * 8, ngf * 4, kernel_size=4, stride=2, padding=1))
        self.main.append(GeneratorBlock(ngf * 4, ngf * 2, kernel_size=4, stride=2, padding=1))
        self.main.append(GeneratorBlock(ngf * 2, ngf, kernel_size=4, stride=2, padding=1))
        self.last = nn.Sequential(
            nn.ConvTranspose2d(ngf, channels_out, kernel_size=4, stride=2, padding=1),
            nn.Tanh()
        )

    def forward(self, x):
        for block in self.main:
            x = block(x)
        #x = self.main(x)
        x = self.last(x)
        return x


class DiscriminatorBlock(nn.Module):
    """Some Information about DiscriminatorBlock"""
    def __init__(self, features_in, features_out, kernel_size, stride, padding):
        super(DiscriminatorBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(features_in, features_out, kernel_size, stride, padding, bias=False),
            nn.BatchNorm2d(features_out),
            nn.LeakyReLU(0.2, inplace=True)
        )

    def forward(self, x):
        x = self.block(x)
        return x

class Discriminator(nn.Module):
    """Some Information about Discriminator"""
    def __init__(self, input_size):
        super(Discriminator, self).__init__()
        

        self.input = nn.Sequential(
            nn.Conv2d(input_size, ndf, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.main = nn.ModuleList()
        self.main.append(DiscriminatorBlock(ndf, ndf * 2, kernel_size=4, stride=2, padding=1))
        self.main.append(DiscriminatorBlock(ndf * 2, ndf * 4, kernel_size=4, stride=2, padding=1))
        self.main.append(DiscriminatorBlock(ndf * 4, ndf * 8, kernel_size=4, stride=2, padding=1))
        self.main.append(DiscriminatorBlock(ndf * 8, ndf * 16, kernel_size=4, stride=2, padding=1))
        self.main.append(DiscriminatorBlock(ndf * 16, ndf * 32, kernel_size=4, stride=2, padding=1))

        self.last = nn.Sequential(
            nn.Conv2d(ndf * 32, 1, kernel_size=4, stride=1, padding=0, bias=False),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        x = self.input(x)
        for block in self.main:
            x = block(x)
        #x = self.main(x)
        x = self.last(x)
        return x

In [None]:

class WaterBodyGeneratorDataset(torch.utils.data.Dataset):
    """Some Information about WaterBodyGeneratorDataset"""
    def __init__(self, root, transform=None):
        super(WaterBodyGeneratorDataset, self).__init__()

        self._image_data = os.path.join(root, "Images")
        self._filenames = os.listdir(self._image_data)
        self.transform = transform
        self._length = len(self._filenames)
    def __getitem__(self, index):
        img_path = os.path.join(self._image_data, self._filenames[index])
        img = Image.open(img_path).convert("RGB")
        if self.transform:
            img = self.transform(img)
        
        return img

    def __len__(self):
        return self._length

In [None]:
def dcgan_train(generator:nn.Module, discriminator:nn.Module, criterion, optimizerD, optimizerG, dataloader:torch.utils.data.DataLoader):
    loop = tqdm(dataloader)
    real_label = 1
    fake_label = 0
    
    for batch_idx, data in enumerate(loop):

        # Prepare real data
        real_img = data[0].to(DEVICE)
        batch_size = real_img.size(0)
        label = torch.full((batch_size,), real_label, dtype=torch.float, device=DEVICE)
        

        ##### TRAIN DISCRIMINATOR ON REAL IMAGES
        discriminator.zero_grad()
        output = discriminator(real_img).view(-1)

        errD_real = criterion(output, label)
        errD_real.backward()

        # To print later
        D_x = output.mean().item()

        ##### TRAIN DISCRIMINATOR ON FAKE IMAGES
        noise = torch.randn(batch_size, 100, 1, 1, device=DEVICE)

        fake = generator(noise)
        label.fill_(fake_label)

        output = discriminator(fake.detach()).view(-1)

        errD_fake = criterion(output, label)
        errD_fake.backward()

        # To print later
        D_G_z1 = output.mean().item()
        errD = errD_real + errD_fake

        optimizerD.step()

        ##### TRAIN GENERATOR
        generator.zero_grad()
        label.fill_(real_label)
        output = discriminator(fake).view(-1)

        errG = criterion(output, label)
        errG.backward()

        optimizerG.step()

        # To print later
        D_G_z2 = output.mean().item()

        #### Verbose
        if batch_idx % 50 == 0:
            print(f"Loss_D: {errD.item()} Loss_G: {errG.item()}   D(x): {D_x}   D(G(z)): {D_G_z1}   |    {D_G_z2}")
        






def get_output(net, input):
    with torch.no_grad():
        out = net(input).detach().cpu()
    return vutils.make_grid(out, padding=2, normalize=True)

In [None]:
#################################################################

In [None]:
from dcgan import Generator, Discriminator, weights_init
from train import dcgan_train, get_output
from dataset import WaterBodyGeneratorDataset

import albumentations as A
import torch
import torchvision.datasets as dset
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.animation as animation
from IPython.display import HTML

In [None]:
VEC_SIZE = 100
CHANNELS_OUT = 3
DEVICE = "cuda"
IMAGE_SIZE = (256,256)
BATCH_SIZE = 64
LEARNING_RATE = 2e-4
BETA1 = 0.5
EPOCHS = 3

In [None]:
generator = Generator(VEC_SIZE, CHANNELS_OUT).to(DEVICE)
generator.apply(weights_init)

In [None]:
discriminator = Discriminator(CHANNELS_OUT).to(DEVICE)
discriminator.apply(weights_init)

In [None]:
tf = A.Compose(
    [
        A.Resize(*IMAGE_SIZE),
        A.Normalize(
            mean = [0.0, 0.0, 0.0],
            std=[1.0, 1.0, 1.0],
            max_pixel_value=255.0),
        A.pytorch.ToTensorV2()
    ]
)

tf2 = transforms.Compose(
    [
        transforms.Resize(IMAGE_SIZE),
        transforms.CenterCrop(IMAGE_SIZE),
        transforms.ToTensor(),
        transforms.Normalize(
            mean    = [0.5, 0.5, 0.5],
            std     = [0.5, 0.5, 0.5])
    ]
)

In [None]:
dataroot = "D:/AI_ML/Kaggle/Water Bodies Dataset/"
dataset = WaterBodyGeneratorDataset(dataroot, tf2)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)

In [None]:
optimizerD = torch.optim.Adam(discriminator.parameters(), lr=LEARNING_RATE, betas=(BETA1, 0.999))
optimizerG = torch.optim.Adam(generator.parameters(), lr=LEARNING_RATE, betas=(BETA1, 0.999))

criterion = torch.nn.BCELoss()
img_list = []

In [None]:
FIXED_NOISE = torch.randn(IMAGE_SIZE[0], VEC_SIZE, 1, 1, device=DEVICE)
for epoch in range(EPOCHS):
    dcgan_train(generator=generator, discriminator=discriminator, criterion=criterion, optimizerD=optimizerD, optimizerG=optimizerG, dataloader=dataloader)
    out = get_output(net=generator, input=FIXED_NOISE)
    img_list.append(out)

In [None]:
fig = plt.figure(figsize=(10,10))
plt.axis("off")
ims = [[plt.imshow(np.transpose(i,(1,2,0)), animated=True)] for i in img_list]
ani = animation.ArtistAnimation(fig, ims, interval=1000, repeat_delay=1000, blit=True)

HTML(ani.to_jshtml())