# Generative Adversarial Network (GAN)

![](https://nbviewer.jupyter.org/github/hse-aml/intro-to-dl/blob/master/week4/images/gan.png)

Пришло время поговорить о более интересных архитектурах, а именно о GANах или состязательных нейронных сетках. [Впервые GANы были предложены в 2014 году.](https://arxiv.org/abs/1406.2661) Сейчас они очень активно исследуются. GANы состоят из двух нейронных сетей: 

* Первая - генератор порождает из некоторого заданного распределения случайные числа и собирает из них объекты, которые идут на вход второй сети. 
* Вторая - дискриминатор получает на вход объекты из реальной выборки и объекты, созданные генератором. Она пытается определить какой объект был порождён генератором, а какой является реальным.

Таким образом генератор пытается создавать объекты, которые дискриминатор не сможет отличить от реальных. 

In [None]:
import tensorflow as tf
print(tf.__version__)

In [None]:
import numpy as np
import time 

import matplotlib.pyplot as plt
%matplotlib inline

# 1. Данные

Для начала давайте попробуем погонять модели на рукописных цифрах из MNIST как бы скучно это не было. 

In [None]:
(X, _ ), (_, _) = tf.keras.datasets.mnist.load_data()

In [None]:
X = X/127.5 - 1 # отнормировали данные на отрезок [-1, 1]

In [None]:
X.min(), X.max()  # проверили нормировку

In [None]:
X = X[:,:,:,np.newaxis]
X.shape

Давайте вытащим несколько рандомных картинок и нарисуем их.

In [None]:
cols = 8
rows = 2
fig = plt.figure(figsize=(2 * cols - 1, 2.5 * rows - 1))
for i in range(cols):
    for j in range(rows):
        random_index = np.random.randint(0, X.shape[0])
        ax = fig.add_subplot(rows, cols, i * rows + j + 1)
        ax.grid(False)
        ax.axis('off')
        ax.imshow(np.squeeze(X,-1)[random_index, :], cmap='gray')
plt.show()

Соберём для наших данных удобный генератор. 

# 2. Дискриминатор 

* Дискриминатор - это обычная свёрточная сетка 
* Цель этой сетки - отличать сгенерированные изображения от реальных

In [None]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras import layers as L

In [None]:
IMG_SHAPE = X.shape[1:]

In [None]:
discriminator = Sequential()

# Ваш код 

# слой Conv2D 64  фильтра, kernel size 5x5, страйд 2 по обеим осям
# бачнорм + LeakyReLU()
# слой Conv2D 128  фильтров, kernel size 5x5, страйд 2 по обеим осям
# бачнорм + LeakyReLU()
# Flatten
# классификация на 2 класса


# на выход из дискриминатора мы забираем логарифм, а не вероятность 
discriminator.add(L.Dense(2, activation=tf.nn.log_softmax))

# 3. Генератор

* Генерирует из шума изображения 

Будем генерировать новых Симпсонов из шума размера 256.

In [None]:
CODE_SIZE = 100

In [None]:
generator = Sequential()

generator.add(L.InputLayer([CODE_SIZE],name='noise'))

generator.add(L.Dense(256*7*7, activation='elu'))
generator.add(L.Reshape((7,7,256)))

generator.add(L.Conv2DTranspose(128, kernel_size=(3,3)))
generator.add(L.LeakyReLU())

generator.add(L.Conv2DTranspose(64, kernel_size=(3,3)))
generator.add(L.LeakyReLU())

generator.add(L.UpSampling2D(size=(2,2)))
generator.add(L.Conv2DTranspose(32,kernel_size=3,activation='elu'))
generator.add(L.Conv2DTranspose(32,kernel_size=3,activation='elu'))
generator.add(L.Conv2DTranspose(32,kernel_size=3,activation='elu'))

generator.add(L.Conv2D(1, kernel_size=3, padding='same'))

print('Выход генератора: ', generator.output_shape[1:])

Посмотрим на пример, который нам генерирует на выход наша свежая нейронка! 

In [None]:
noise = tf.random.normal([1, CODE_SIZE])
noise.shape

In [None]:
generated_image =  generator(noise)

plt.imshow(generated_image[0, :, :, 0], cmap='gray');

Хммм... А что про это всё думает дескриминатор?

In [None]:
decision = discriminator(generated_image)

# на выход из дискриминатора мы забираем логарифм!
np.exp(decision)

# 4. Функция потерь 

Потери для дескриминатора это обычныя кросс-энтропия.

In [None]:
# Потери для дискриминатора 
def discriminator_loss(logp_real, logp_gen):

    # Ваш код

    return d_loss

In [None]:
real_log = discriminator(X[:1])
gen_log = discriminator(generated_image)

discriminator_loss(real_log, gen_log)

Для генератора мы хотим максимизировать ошибку дискриминатора на фэйковых примерах. 

In [None]:
# Потери для генератора
def generator_loss(logp_gen):
    
    # Ваш код 
    
    return g_loss

In [None]:
generator_loss(gen_log)

# 5. Градиентный спуск

Учить пару из сеток будем так: 

* Делаем $k$ шагов обучения дискриминатора. Целевая переменная - реальный объект перед нами или порождённый. Веса изменяем стандартно, пытаясь уменьшить кросс-энтропию.
* Делаем $m$ шагов обучения генератора. Веса внутри сетки меняем так, чтобы увеличить логарифм вероятности дискриминатора присвоить сгенерированному объекту лэйбл реального. 
* Обучаем итеративно до тех пор, пока дискриминатор больше не сможет найти разницу (либо пока у нас не закончится терпение).
* При обучении может возникнуть огромное количество пробем от взрыва весов до более тонких вещей. Имеет смысл посмотреть на разные трюки, используемые при обучении:  https://github.com/soumith/ganhacks

Собираем структуру для обучения.

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)
#discriminator_optimizer = tf.keras.optimizers.SGD(1e-3)

Чекпойнты для процесса обучения.

In [None]:
import os 
checkpoint_dir = './training_checkpoints'

checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")

checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

Задаём один шаг процедуры обучения генератора.

In [None]:
@tf.function
def train_generator_step(images, noise):

    # ищем градиенты 
    with tf.GradientTape() as gen_tape:
        
        # ВАШ КОД
        
        # сгенерировали новое изображение из шума
        # посчитали прогнозы дискриминатора
        # нашли ошибку
        gen_loss = ...
        
    # нашли градиенты
    grad = gen_tape.gradient(gen_loss, generator.trainable_variables)
        
    # сделали шаг градиентного спуска 
    generator_optimizer.apply_gradients(zip(grad, generator.trainable_variables))

Теперь шаг обучения дискриминатора. 

In [None]:
@tf.function
def train_discriminator_step(images, noise):
    
   # ищем градиенты 
    with tf.GradientTape() as gen_tape:
        
        # ВАШ КОД
        
        # сгенерировали новое изображение из шума       
        # посчитали прогнозы дискриминатора
        # нашли ошибку
        dis_loss = ...
        
    # нашли градиенты
    grad = gen_tape.gradient(dis_loss, discriminator.trainable_variables)
        
    # сделали шаг градиентного спуска 
    generator_optimizer.apply_gradients(zip(grad, discriminator.trainable_variables))

> Обратите внимание, что можно реализовать функцию для обучения сразу для обеих нейронок, а не как мы по отдельности. В случае совместной реализации код будет работать быстрее. 

Мы почти готовы учить нашу сетку. Напишем две простенькие функции для генерации фэйковых и настоящих батчей. 

In [None]:
# функция, которая генерирует батч с шумом
def sample_noise_batch(bsize):
    return tf.random.normal([bsize, CODE_SIZE], dtype=tf.float32)

# функция, которая генерирует батч из реальных данных (для баловства)
def sample_data_batch(bsize):
    idxs = np.random.choice(np.arange(X.shape[0]), size=bsize)
    return X[idxs]

Проверяем отрабатывают ли наши шаги.

In [None]:
data_test = sample_data_batch(256)
fake_test = sample_noise_batch(256)

gen_log = discriminator(generator(fake_test))
real_log = discriminator(data_test)

print('Ошибка дескриминатора:', discriminator_loss(real_log, gen_log).numpy())
print('Ошибка генератора:', generator_loss(gen_log).numpy())

In [None]:
# сделали шаг работы генератора
train_generator_step(data_test, fake_test)

gen_log = discriminator(generator(fake_test))
real_log = discriminator(data_test)

print('Ошибка дескриминатора:', discriminator_loss(real_log, gen_log).numpy())
print('Ошибка генератора:', generator_loss(gen_log).numpy())

In [None]:
# сделали шаг работы дискриминатора
train_discriminator_step(data_test, fake_test)

gen_log = discriminator(generator(fake_test))
real_log = discriminator(data_test)

print('Ошибка дескриминатора:', discriminator_loss(real_log, gen_log).numpy())
print('Ошибка генератора:', generator_loss(gen_log).numpy())

Как думаете, выглядит адекватно? Мы нигде не ошиблись? 

Напишем пару вспомогательных функций для отрисовки картинок. 

In [None]:
# рисуем изображения
def sample_images(rows, cols, num=0):
    images = generator.predict(sample_noise_batch(bsize=rows*cols))
    
    fig = plt.figure(figsize=(2 * cols - 1, 2.5 * rows - 1))
    for i in range(cols):
        for j in range(rows):
            ax = fig.add_subplot(rows, cols, i * rows + j + 1)
            ax.grid('off')
            ax.axis('off')
            ax.imshow(np.squeeze(images[i * rows + j],-1),cmap='gray')
    
    # сохраняем картинку для гифки
    if num >0:
        plt.savefig('images_gan/image_at_epoch_{:04d}.png'.format(num))
    plt.show()


# рисуем распределения
def sample_probas(X):
    plt.title('Generated vs real data')
    
    plt.hist(np.exp(discriminator.predict(X))[:,1],
             label='D(x)', alpha=0.5,range=[0,1])
    
    plt.hist(np.exp(discriminator.predict(generator.predict(sample_noise_batch(X.shape[0]))))[:,1],
             label='D(G(z))',alpha=0.5,range=[0,1])
    
    plt.legend(loc='best')
    plt.show()

In [None]:
sample_images(2,7)

In [None]:
sample_probas(X[:100]) 

Немного побалуемся с шагами. 

In [None]:
data_test = sample_data_batch(256)
fake_test = sample_noise_batch(256)

# Генератор
train_generator_step(data_test, fake_test)

gen_log = discriminator(generator(fake_test))
real_log = discriminator(data_test)

print('Ошибка дескриминатора:', discriminator_loss(real_log, gen_log).numpy())
print('Ошибка генератора:', generator_loss(gen_log).numpy())

In [None]:
sample_images(2,7)

In [None]:
data_test = sample_data_batch(256)
fake_test = sample_noise_batch(256)

# Дискриминатор
train_discriminator_step(data_test, fake_test)

gen_log = discriminator(generator(fake_test))
real_log = discriminator(data_test)

print('Ошибка дескриминатора:', discriminator_loss(real_log, gen_log).numpy())
print('Ошибка генератора:', generator_loss(gen_log).numpy())

# 6. Обучение

Ну и наконец последний шаг. Тренировка сеток.  При обучении нужно соблюдать между сетками баланс. Важно сделать так, чтобы ни одна из них не стала сразу же побеждать. Иначе обучение остановится. 

* Чтобы избежать моментального выигрыша дискриминатора, мы добавили в его функцию потерь $l_2$ регуляризацию. 
* Кроме регуляризации можно пытаться учить модели сбалансированно, делая внутри цикла шаги чуть более умным способом. 


In [None]:
from IPython import display

EPOCHS = 5000
BSIZE = 256

# время
start = time.time()/60

# вектора для мониторинга сходимости сеток
d_losses = [ ]
g_losses = [ ]

num = 0 # для сохранения картинок 

# запускаем цикл обучения 
for epoch in range(EPOCHS):
    
    # генерируем батч
    X_batch = sample_data_batch(BSIZE)
    X_fake = sample_noise_batch(BSIZE)
    
    # делаем N шагов обучения дискриминатора
    for i in range(5):
        train_discriminator_step(X_batch, X_fake)
        
    # делаем K шагов обучения генератора
    for i in range(1):
        train_generator_step(X_batch, X_fake)

    gen_log = discriminator(generator(X_fake))
    real_log = discriminator(X_batch) 
    
    d_losses.append(discriminator_loss(real_log, gen_log).numpy())
    g_losses.append(generator_loss(gen_log).numpy())
        
    # ну сколько можно ждааать!!! 
    if epoch % 1==0:
        print('Time for epoch {} is {} min'.format(epoch + 1, time.time()/60-start))
        print('error D: {}, error G: {}'.format(d_losses[-1], g_losses[-1]))

    if epoch % 10==0:
        # сохраняем модель и обновляем картинку
        # checkpoint.save(file_prefix = checkpoint_prefix)

        # можно раскоментировать, если хочется, чтобы картинка обновлялась, а не дополнялас
        #display.clear_output(wait=True)
        num += 1
        sample_images(2,7, num)
        sample_probas(X_batch)

Тренируем сетки.

In [None]:
# сетка тренировалась много итераций
sample_images(4,8)  

In [None]:
# смотрим сошлись ли потери
plt.plot(d_losses, label='Discriminator')
plt.plot(g_losses, label='Generator')
plt.ylabel('loss')
plt.legend();

# 7. Интерполяция 

Давайте попробуем взять два вектора, сгенерированных из нормального распределения и посмотреть как один из них перетекакет в другой. 

In [None]:
from scipy.interpolate import interp1d

def show_interp_samples(point1, point2, N_samples_interp):
    N_samples_interp_all = N_samples_interp + 2

    # линия между двумя точками
    line = interp1d([1, N_samples_interp_all], np.vstack([point1, point2]), axis=0)

    fig = plt.figure(figsize=(15,4))
    for i in range(N_samples_interp_all):
        ax = fig.add_subplot(1, 2 + N_samples_interp, i+1)
        ax.grid('off')
        ax.axis('off')
        ax.imshow(generator.predict(line(i + 1).reshape((1, 100)))[0, :, :, 0],cmap='gray')
    plt.show()
    pass

In [None]:
np.random.seed(seed=42)

# Рандомная точка в пространстве
noise_1 = np.random.normal(0, 1, (1, 100))

# смотрим как она перетекает в симметричкную
show_interp_samples(noise_1, -noise_1, 6)

In [None]:
noise_2 = np.random.normal(0, 1, (1, 100))
show_interp_samples(noise_1, noise_2, 6)

А что мы вообще сгенерировали?! Давайте посмотрим на точку из выборки наиболее близкую к получившейся генерации.

In [None]:
id_label_sample = 8
img_smp = generator.predict(sample_noise_batch(1))
plt.imshow(img_smp[0,:,:,0], cmap='gray')

In [None]:
img_smp.shape, X.shape

In [None]:
# ищем l1 норму между тем, что сгенерилось и остальным 
L1d = np.sum(np.sum(np.abs(X[:,:,:,0] - img_smp[:,:,:,0]), axis=1), axis=1)
idx_l1_sort = L1d.argsort()
idx_l1_sort.shape

In [None]:
idx_l1_sort[:5]

In [None]:
N_closest = 8

fig = plt.figure(figsize=(15,4))
for i in range(N_closest):
    ax = fig.add_subplot(1, N_closest, i+1)
    ax.grid('off')
    ax.axis('off')
    ax.imshow(X[idx_l1_sort[i], :, :, 0], cmap='gray')
plt.show()

Сохраняю гифку из картинок. 

In [None]:
import os
import glob
import imageio

def create_animated_gif(files, animated_gif_name, pause=0):
    if pause != 0:
        
        frames = []
        for file in files:
            count = 0
            while count < pause:
                frames.append(file)
                count+=1
        print("Total number of frames in the animation:", len(frames))
        files = frames
    images = [imageio.imread(file) for file in files]
    imageio.mimsave(animated_gif_name, images, duration = 0.005)

In [None]:
pause = 1
animated_gif_name = 'animation_GAN.gif'

In [None]:
image_path = 'images_gan/*.png'
files = glob.glob(image_path)
files = sorted(files, key = lambda w: int(w.split('_')[-1].split('.')[0]))
create_animated_gif(files, animated_gif_name, pause)

# Задание : 

* Превратить нашу GAN в Conditional GAN 

![](https://camo.githubusercontent.com/63a263678253a1eedd74432ad85751da2407a3d8/687474703a2f2f6775696d70657261726e61752e636f6d2f66696c65732f626c6f672f46616e7461737469632d47414e732d616e642d77686572652d746f2d66696e642d7468656d2f6347414e5f6f766572766965772e6a7067)

На этом всё :) 

![](https://miro.medium.com/max/896/1*3VOLkgm-QY05gEpGDkBzTA.gif)