# Lab14-2 GAN
110065508 李丞恩

In [1]:
SAMPLE_COL = 16
SAMPLE_ROW = 16
SAMPLE_NUM = SAMPLE_COL * SAMPLE_ROW

IMG_H = 64
IMG_W = 64
IMG_C = 3
IMG_SHAPE = (IMG_H, IMG_W, IMG_C)

BATCH_SIZE = 20
Z_DIM = 128
BZ = (BATCH_SIZE, Z_DIM)
BUF = 65536

DC_LR = 2.5e-04
DC_EPOCH = 256

W_LR = 2.0e-04
W_EPOCH = 256
WClipLo = -0.01
WClipHi = 0.01

In [2]:
%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
import os

import tensorflow as tf
import tensorflow.keras as keras
import imageio
import moviepy.editor as mpy
import IPython.display as display
import matplotlib.pyplot as plt
import pathlib
import random
import re
from pprint import pprint
from tqdm.notebook import tqdm
from tensorflow.keras import utils, datasets, layers, models
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.utils import shuffle

2021-12-20 00:40:07.464500: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


In [3]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # disable warnings and info

In [4]:
gpus = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpus[0], True)
tf.config.experimental.set_virtual_device_configuration(gpus[0], \
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit = 10000)])

2021-12-20 00:40:08.423131: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-12-20 00:40:08.453800: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-12-20 00:40:08.454274: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:01:00.0 name: NVIDIA GeForce RTX 3060 computeCapability: 8.6
coreClock: 1.852GHz coreCount: 28 deviceMemorySize: 11.77GiB deviceMemoryBandwidth: 335.32GiB/s
2021-12-20 00:40:08.454296: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-12-20 00:40:08.456109: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
2021-12-20 00:40:08.456256: I tensorflow/stream_executor/pl

In [5]:
if not os.path.exists("input") : os.mkdir("input")
if not os.path.exists("output") : os.mkdir("output")
if not os.path.exists("output/gif") : os.mkdir("output/gif")
if not os.path.exists("output/imgs_HW") : os.mkdir("output/imgs_HW")
if not os.path.exists("output/imgs_not_HW") : os.mkdir("output/imgs_not_HW")

## 一. 製作CelebA的tfrecord檔

### 1. Write tfrecords

In [None]:
data_root = pathlib.Path('./input/datalab-lab-14-2')
all_image_paths = list(data_root.glob('*'))
all_image_paths = [str(path) for path in all_image_paths]
all_image_paths = [path for path in all_image_paths if path[-3:] in ('png')]
image_count = len(all_image_paths)
print('\ntotal img num:', image_count)

In [None]:
def _bytes_feature(value): # Returns a bytes_list from a string / byte.
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def to_tfrecord(img):  
    feature={
        "image": _bytes_feature(img)
    }
    return tf.train.Example(features=tf.train.Features(feature=feature))

def prepare_tfrecords(dataset_path, tfrecord_file):
    with tf.io.TFRecordWriter(tfrecord_file) as out_file:
        index = [i for i in range(len(dataset_path))]
        for i in range(len(dataset_path)):
            img = open(dataset_path[index[i]], 'rb').read() #  Read the images
            example = to_tfrecord(img) # write to 
            out_file.write(example.SerializeToString())

In [None]:
#prepare_tfrecords(all_image_paths, './input/dataset.tfrecord') # 跑過一次就不要再跑了

### 2. preprocess the data

In [None]:
# Create a description of the features.
feature_description = {
    'image': tf.io.FixedLenFeature([], tf.string, default_value=''),
}

def _parse_function(example_proto): # # Parse the input `tf.Example` proto using the dictionary above.
    feature_dict = tf.io.parse_single_example(example_proto, feature_description)
    img = tf.io.decode_png(feature_dict['image'], channels=IMG_C)
    img = tf.image.resize(img, (IMG_H, IMG_W))
    return img

In [None]:
def read_dataset(tfrecord_file):
    dataset = tf.data.TFRecordDataset(tfrecord_file)
    dataset = dataset.map(_parse_function)
    dataset = dataset.batch(BATCH_SIZE)
    return dataset

### 3. Read dataset

In [None]:
dsTrain = read_dataset('./input/dataset.tfrecord')

In [None]:
dsTrain

## 二. 設計GAN的架構

In [None]:
def GAN(img_shape, z_dim):
    # x-shape
    xh, xw, xc = img_shape
    # z-shape
    zh = xh // 4
    zw = xw // 4
        
    # return Generator and Discriminator
    return keras.Sequential([ # Generator
        keras.layers.Dense(units  =  1024, input_shape = (z_dim,)),
        keras.layers.BatchNormalization(),
        keras.layers.ReLU(),
        keras.layers.Dense(units  =  zh * zw << 8), # zh * zw * 256
        keras.layers.BatchNormalization(),
        keras.layers.ReLU(),
        keras.layers.Reshape(target_shape = (zh, zw, 256)),
        keras.layers.Conv2DTranspose(
            filters = 32,
            kernel_size = 5,
            strides = 2,
            padding = "SAME"
        ),
        keras.layers.BatchNormalization(),
        keras.layers.ReLU(),
        keras.layers.Conv2DTranspose(
            filters = xc,
            kernel_size = 5,
            strides = 2,
            padding = "SAME",
            activation = keras.activations.sigmoid
        ),
    ]), keras.Sequential([ # Discriminator
        keras.layers.Conv2D(
            filters = 32,
            kernel_size = 5,
            strides = (2, 2),
            padding = "SAME",
            input_shape = img_shape,
        ),
        keras.layers.LeakyReLU(),
        keras.layers.Conv2D(
            filters = 128,
            kernel_size = 5,
            strides = (2, 2),
            padding = "SAME"
        ),
        keras.layers.BatchNormalization(),
        keras.layers.LeakyReLU(),
        keras.layers.Flatten(),
        keras.layers.Dense(units  =  1024),
        keras.layers.BatchNormalization(),
        keras.layers.LeakyReLU(),
        keras.layers.Dense(units  =  1),
    ])

s = tf.random.normal([SAMPLE_NUM, Z_DIM])

In [None]:
IWG, IWD = GAN(IMG_SHAPE, Z_DIM)
optimizer_g = keras.optimizers.Adam(W_LR, beta_1=0, beta_2=0.9)
optimizer_d = keras.optimizers.Adam(W_LR, beta_1=0, beta_2=0.9)

## 三. GAN的訓練函數定義

In [None]:
@tf.function
def IWGTrain(c1):
    with tf.GradientTape() as tpg:
        with tf.GradientTape() as tp_gradientpenalty:
            x = c1 # sample x from \mathbb{P}_r
            z = tf.random.normal(BZ) # sample latent variable z form p(z)
            epsilon = tf.random.uniform([BATCH_SIZE,1,1,1]) # sample \epsilon from U[0,1]
            
            x_tilde = IWG(z, training = True) # \tilde{x}<-G_\theta(z)
            x_hat = epsilon * x + (1 - epsilon) * x_tilde # do linear combination
            
            Dwx_tilde = IWD(x, training = True)
            Dwx = IWD(x_tilde, training = True)
            grad = IMPROVED_WD(x_hat, training = True)
            penalty = 10 * tf.math.square(tf.norm(tp_gradientpenalty.gradient(grad, x_tilde), ord='euclidean') - 1)
            loss = Dwx_tilde - Dwx + penalty
            
            ld = tf.reduce_mean(loss)
            lg = - tf.reduce_mean(z0)
        
    gradient_g = tpg.gradient(lg, IWG.trainable_variables)
    optimizer_g.apply_gradients(zip(gradient_g, IWG.trainable_variables))
    return lg, ld

@tf.function
def IWDTrain(c1):
    with tf.GradientTape() as tpg:
        with tf.GradientTape() as tp_gradientpenalty:
            x = c1 # sample x from \mathbb{P}_r
            z = tf.random.normal(BZ) # sample latent variable z form p(z)
            epsilon = tf.random.uniform([BATCH_SIZE,1,1,1]) # sample \epsilon from U[0,1]
            
            x_tilde = IWG(z, training = True) # \tilde{x}<-G_\theta(z)
            x_hat = epsilon * x + (1 - epsilon) * x_tilde # do linear combination
            
            Dwx_tilde = IWD(x, training = True)
            Dwx = IWD(x_tilde, training = True)
            grad = IMPROVED_WD(x_hat, training = True)
            penalty = 10 * tf.math.square(tf.norm(tp_gradientpenalty.gradient(grad, x_tilde), ord='euclidean') - 1)
            loss = Dwx_tilde - Dwx + penalty
            
            ld = tf.reduce_mean(loss)
            lg = - tf.reduce_mean(z0)
        
    gradient_d = tpd.gradient(ld, IWD.trainable_variables)
    optimizer_d.apply_gradients(zip(gradient_d, IWD.trainable_variables)) # No weight clipping in improved WGAN!
    return lg, ld

In [None]:
# ratio of training step D:G = 5:1
WTrain = (
    IWDTrain,
    IWDTrain,
    IWDTrain,
    IWDTrain,
    IWDTrain,
    IWGTrain
)

WCritic = len(WTrain)

## 四. 訓練過程視覺化

In [None]:
# Utility function
def utPuzzle(imgs, row, col, path=None):
    h, w, c = imgs[0].shape
    out = np.zeros((h * row, w * col, c), np.uint8)
    for n, img in enumerate(imgs):
        j, i = divmod(n, col)
        out[j * h : (j + 1) * h, i * w : (i + 1) * w, :] = img
    if path is not None : imageio.imwrite(path, out)
    return out
  
def utMakeGif(imgs, fname, duration):
    n = float(len(imgs)) / duration
    clip = mpy.VideoClip(lambda t : imgs[int(n * t)], duration = duration)
    clip.write_gif(fname, fps = n)

In [None]:
wlg = [None] * W_EPOCH #record loss of g for each epoch
wld = [None] * W_EPOCH #record loss of d for each epoch
wsp = [None] * W_EPOCH #record sample images for each epoch

rsTrain = float(BATCH_SIZE) / float(image_count)
ctr = 0
for ep in tqdm(range(W_EPOCH)):
    lgt = 0.0
    ldt = 0.0
    for c1 in dsTrain:
        lg, ld = WTrain[ctr](c1)
        ctr += 1
        lgt += lg.numpy()
        ldt += ld.numpy()
        if ctr == WCritic : ctr = 0
    wlg[ep] = lgt * rsTrain
    wld[ep] = ldt * rsTrain
    
    out = IWG(s, training = False)
    img = utPuzzle(
        (out * 255.0).numpy().astype(np.uint8),
        SAMPLE_COL,
        SAMPLE_ROW,
        "./output/imgs_HW/iw_%04d.png" % ep
    )
    wsp[ep] = img
    if (ep+1) % 32 == 0:
        
        plt.imshow(img[..., 0], cmap = "gray")
        plt.axis("off")
        plt.title("Epoch %d" % ep)
        plt.show()

In [None]:
utMakeGif(np.array(wsp), "./output/gif/improved_wgan_celebA_110065508.gif", duration = 2)

In [None]:
plt.plot(range(W_EPOCH), wld, color = "blue", label = "Discriminator Loss")
plt.plot(range(W_EPOCH), wlg, color = "red", label = "Generator Loss")
plt.legend(loc = "upper right")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title("Improved WGAN Training Loss")
plt.show()