In [1]:
import os
import sys

if 'has_changed_dir' not in globals():
    repo_path = os.path.abspath(os.path.join('..'))
    
    if repo_path not in sys.path:
        sys.path.append(repo_path)
    
    os.chdir(repo_path)
    
    globals()['has_changed_dir'] = True

import numpy as np
from PIL import Image
from tqdm import tqdm

import neunet as nnet
import neunet.nn as nn
from data_loader import load_mnist
from neunet.optim import Adam

In [2]:
image_size = (1, 28, 28)
x_num, y_num = 5, 5
samples_num = x_num * y_num
margin = 15

dataset, _, _, _ = load_mnist()
dataset = dataset / 127.5 - 1  # normalization: / 255 => [0; 1]  #/ 127.5-1 => [-1; 1]

noise_size = 100

device = "cuda"

In [3]:
generator = nn.Sequential(
    nn.Linear(noise_size, 256),
    nn.LeakyReLU(),
    nn.BatchNorm1d(256),
    nn.Linear(256, 512),
    nn.Dropout(0.2),
    nn.BatchNorm1d(512),
    nn.LeakyReLU(),
    nn.Linear(512, 784),
    nn.Tanh(),
).to(device)

discriminator = nn.Sequential(
    nn.Linear(784, 128),
    nn.LeakyReLU(),
    nn.Linear(128, 64),
    nn.LeakyReLU(),
    nn.Linear(64, 1),
    nn.Sigmoid(),
).to(device)

loss_fn = nn.MSELoss()

g_optimizer = Adam(generator.parameters(), lr=0.001, betas=(0.5, 0.999))
d_optimizer = Adam(discriminator.parameters(), lr=0.001, betas=(0.5, 0.999))

In [4]:
batch_size = 100
epochs = 30

each_epoch_generated_samples = []
const_noise = nnet.tensor(
    np.random.normal(0, 1, (samples_num, noise_size)),
    device=device,
)

for epoch in range(epochs):
    tqdm_range = tqdm(range(0, len(dataset), batch_size), desc=f"epoch {epoch}")
    generator.train()
    discriminator.train()
    for i in tqdm_range:
        batch = dataset[i : i + batch_size]
        batch = nnet.tensor(batch, device=device)

        d_optimizer.zero_grad()

        # train discriminator on real data
        real_data = batch
        real_data = real_data.reshape(real_data.shape[0], -1)

        real_data_prediction = discriminator(real_data)
        real_data_loss = loss_fn(
            real_data_prediction,
            nnet.tensor(
                np.ones((real_data_prediction.shape[0], 1)),
                device=device,
            ),
        )
        real_data_loss.backward()
        d_optimizer.step()

        # train discriminator on fake data
        noise = nnet.tensor(
            np.random.normal(0, 1, (batch_size, noise_size)),
            device=device,
        )
        fake_data = generator(noise)
        fake_data_prediction = discriminator(fake_data)
        fake_data_loss = loss_fn(
            fake_data_prediction,
            nnet.tensor(
                np.zeros((fake_data_prediction.shape[0], 1)),
                device=device,
            ),
        )
        fake_data_loss.backward()
        d_optimizer.step()

        g_optimizer.zero_grad()

        noise = nnet.tensor(
            np.random.normal(0, 1, (batch_size, noise_size)),
            device=device,
        )
        fake_data = generator(noise)
        fake_data_prediction = discriminator(fake_data)
        fake_data_loss = loss_fn(
            fake_data_prediction,
            nnet.tensor(
                np.ones((fake_data_prediction.shape[0], 1)),
                device=device,
            ),
        )
        fake_data_loss.backward()
        g_optimizer.step()

        g_loss = -np.log(fake_data_prediction.detach().cpu().numpy()).mean()
        d_loss = (
            -np.log(real_data_prediction.detach().cpu().numpy()).mean()
            - np.log(1 - fake_data_prediction.detach().cpu().numpy()).mean()
        )
        tqdm_range.set_description(
            f"epoch: {epoch + 1}/{epochs}, G loss: {g_loss:.7f}, D loss: {d_loss:.7f}"
        )

    if const_noise == None:
        noise = nnet.tensor(
            np.random.normal(0, 1, (batch_size, noise_size)),
            device=device,
        )
    else:
        noise = const_noise

    each_epoch_generated_samples.append(generator(noise).detach().cpu().numpy().reshape(-1, 28, 28))

    generator.eval()
    discriminator.eval()
    for i in range(samples_num):
        image = each_epoch_generated_samples[-1][i] * 127.5 + 127.5
        image = image.astype(np.uint8)
        image = image.reshape(28, 28)
        image = Image.fromarray(image)
        image.save(f"generated images/{i}.png")

epoch: 1/30, G loss: 0.4438888, D loss: 1.5700481: 100%|██████████| 600/600 [00:22<00:00, 26.90it/s]
epoch: 2/30, G loss: 0.4623058, D loss: 1.4214900: 100%|██████████| 600/600 [00:19<00:00, 30.44it/s]
epoch: 3/30, G loss: 0.3498764, D loss: 1.9643488: 100%|██████████| 600/600 [00:20<00:00, 29.10it/s]
epoch: 4/30, G loss: 0.4362092, D loss: 1.4030291: 100%|██████████| 600/600 [00:20<00:00, 29.32it/s]
epoch: 5/30, G loss: 0.4470197, D loss: 1.4730237: 100%|██████████| 600/600 [00:20<00:00, 28.65it/s]
epoch: 6/30, G loss: 0.4218569, D loss: 1.4497340: 100%|██████████| 600/600 [00:20<00:00, 29.94it/s]
epoch: 7/30, G loss: 0.4156743, D loss: 1.4897844: 100%|██████████| 600/600 [00:20<00:00, 29.47it/s]
epoch: 8/30, G loss: 0.4309656, D loss: 1.4677904: 100%|██████████| 600/600 [00:19<00:00, 30.03it/s]
epoch: 9/30, G loss: 0.4034271, D loss: 1.5358443: 100%|██████████| 600/600 [00:20<00:00, 29.98it/s]
epoch: 10/30, G loss: 0.4116855, D loss: 1.4903698: 100%|██████████| 600/600 [00:20<00:00, 

In [None]:

generator.eval()
discriminator.eval()

In [None]:

def get_images_set(images):
    images_array = np.full(
        (y_num * (margin + image_size[1]), x_num * (margin + image_size[2])),
        255,
        dtype=np.uint8,
    )
    num = 0
    for i in range(y_num):
        for j in range(x_num):
            y = i * (margin + image_size[1])
            x = j * (margin + image_size[2])

            images_array[y : y + image_size[1], x : x + image_size[2]] = images[num]
            num += 1

    images_array = images_array[
        : (y_num - 1) * (image_size[1] + margin) + image_size[1],
        : (x_num - 1) * (image_size[2] + margin) + image_size[2],
    ]

    return Image.fromarray(images_array).convert("L")

In [None]:
def create_vectors_interpolation():
    """Create vectors create interpolation  in the latent space between two sets of noise vectors"""
    steps = 10
    interval = 15
    images = []

    noise_1 = nnet.tensor(np.random.normal(0, 1, (samples_num, noise_size)))

    for _ in range(steps):
        noise_2 = nnet.tensor(np.random.normal(0, 1, (samples_num, noise_size)))

        noise_interp = np.linspace(noise_1.numpy(), noise_2.numpy(), interval)

        noise_1 = noise_2

        for vectors in noise_interp:
            generated_images = (
                generator(nnet.tensor(vectors, device=device))
                .reshape(-1, 28, 28)
                .to("cpu")
                .detach()
                .numpy()
                * 127.5
                + 127.5
            )

            images.append(get_images_set(generated_images).convert("L").convert("P"))

    images[0].save(
        "generated images/gan_vectors_interpolation.gif",
        save_all=True,
        append_images=images[1:],
        optimize=False,
        duration=100,
        loop=0,
    )


In [None]:
create_vectors_interpolation()


each_epoch_generated_images = []
for epoch_samples in each_epoch_generated_samples:
    each_epoch_generated_images.append(
        get_images_set(epoch_samples * 127.5 + 127.5).convert("L").convert("P")
    )

each_epoch_generated_images[0].save(
    "generated images/gan_training_process.gif",
    save_all=True,
    append_images=each_epoch_generated_images[1:],
    optimize=False,
    duration=150,
    loop=0,
)
