In [None]:
%matplotlib inline

# 美少女無窮生成 cntk

![md_images](../Images/gan.jpg)

![md_images](../Images/rasgan.png)

In [None]:
import matplotlib
import matplotlib.pyplot as plt
import pylab
import PIL
from PIL import Image
import numpy as np
import os
import datetime
import time
import glob
import pylab
import cv2
import math
import string
import cntk as C
from cntk.ops import *
from cntk.initializer import *
from cntk.ops.functions import *
from cntk.layers import *
from cntk.losses import *
from cntk.train import *
from cntk.learners import *
from cntk.metrics import *
from cntk.device import *
import random 



# 是否使用GPU
is_gpu = True
if is_gpu:
    try_set_default_device(gpu(0))
else:
    try_set_default_device(cpu())

In [None]:
img_h = 64
img_w = 64
img_c = 3

#產生高斯分布噪音
def noise_sample(num_samples, g_input_dim=100):
    return np.random.normal(
        size=[num_samples, g_input_dim]
    ).astype(np.float32)

s =  glob.glob('../Data/ex08_train/resized_images/' + '*.jpg')
print('{0}張圖片'.format(len(s)))
random.shuffle(s)
idx = 0

In [None]:
#圖片轉向量
def img2array(img: Image):
    arr = np.array(img).astype(np.float32)
    arr=arr.transpose(2, 0, 1) #轉成CHW
    arr=np.ascontiguousarray(arr)
    return arr[::-1] #顏色排序為BGR

#向量轉圖片
def array2img(arr: np.ndarray):
    arr =arr[::-1]#轉成RGB
    sanitized_img = np.maximum(0, np.minimum(255, np.transpose(arr, (1, 2, 0))))#轉成HWC
    img = Image.fromarray(sanitized_img.astype(np.uint8))
    return img

#隨機加入標準常態分配的噪聲
def add_noise(image):
    noise=np.random.standard_normal(image.shape)*np.random.choice(np.arange(-5,5))
    image=np.clip(image+noise,0,255)
    return image

#調整明暗
def adjust_gamma(image,gamma=1.2):
    image = image.transpose([1, 2, 0])
    invGamma = 1.0 / gamma
    table = np.array([((i / 255.0) ** invGamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
    cv2.LUT(image.astype(np.uint8), table)
    image = image.transpose([2, 0, 1])
    return image

#模糊
def adjust_blur(image):
    image = image.transpose([1, 2, 0])
    image=cv2.blur(image, (3, 3))
    image = image.transpose([2, 0, 1])
    return image



def next_minibatch(minibatch_size,is_train=True):
    global s, idx
    features = []
    while len(features) < minibatch_size:
        try:
            im = Image.open(s[idx]).convert('RGB').resize((64,64),Image.ANTIALIAS) 
            im = img2array(im).astype(np.float32)
            #加入數據增強以確保圖片輸入的多元性，避免鑑別模型記憶樣本過擬合
            if is_train:
                im=add_noise(im)
                if random.randint(0,10)%2==0:
                    gamma=np.random.choice(np.arange(0.6, 1.5, 0.05))
                    img=adjust_gamma(im,gamma)
                if random.randint(0,10)%5<=1:
                    im=adjust_blur(im)
                    
            features.append((im-127.5)/127.5)
        except OSError as e:
            print(e)
        idx += 1
        if idx >= len(s):
            random.shuffle(s)
            idx = 0
    return np.asarray(features).astype(np.float32)

![md_images](../Images/self_attention_module.png)

In [None]:

def self_attn_block(x,num_filters, squeeze_factor=8):
    '''
    代碼來自於 https://github.com/taki0112/Self-Attention-GAN-Tensorflow
    '''
    f = Convolution((1, 1), num_filters // squeeze_factor, pad=True, name='f_conv')(x)
    g = Convolution((1, 1), num_filters // squeeze_factor, pad=True, name='g_conv')(x)
    h = Convolution((1, 1), num_filters, pad=True, name='h_conv')(x)

    h_shape = h.shape

    flat_f = reshape(f, (-1,f.shape[0])) 
    flat_g = reshape(g, (-1,g.shape[0])) 
    flat_h = reshape(h, (-1,h.shape[0]))  

    s = times_transpose(flat_g, flat_f)  #(1024,1024) [N,N]  N = h * w

    beta = softmax(s,-1)
    o = times(beta, flat_h)##(1024,128)  [ N, C]
    o = reshape(o, (h_shape[1], h_shape[2], h_shape[0]))
    o = transpose(o, [2, 0, 1])
    gamma = Parameter(1, init=0.0)
    x = gamma * o + x
    return x



def pixel_shuffle(x,scale):
    h=x.shape[1]
    w=x.shape[2]
    x = C.reshape(x, (x.shape[0]//(scale*scale),scale,scale, x.shape[1], x.shape[2]))
    x = transpose(x,[0,3,4,1,2])
    
    slicelist = [x[:,i,:,:,:] for i in range(h)]
    x=splice(*slicelist,axis=3)
    slicelist = [x[:,:,i,:,:] for i in range(w)]
    x=splice(*slicelist,axis=4)
    x = squeeze(x)
    return x


![md_images](../Images/pixelshuffle.jpg)

In [None]:

def conv_bn_leaky_relu(input, filter_size, num_filters, strides=(1, 1), init=C.xavier(0.0005),dilation=1, bias=False):
    c = Convolution2D(filter_size, num_filters, activation=None, init=init, pad=True, strides=strides,dilation=dilation, bias=bias)(input)
    r = BatchNormalization()(c)
    return leaky_relu(r)

def conv_bn_relu(input, filter_size, num_filters, strides=(1, 1), init=C.xavier(0.0005),dilation=1, bias=False):
    c = Convolution2D(filter_size, num_filters, activation=None, init=init, pad=True, strides=strides,dilation=dilation, bias=bias)(input)
    r = BatchNormalization()(c)
    return relu(r)


def conv_leaky_relu(input, filter_size, num_filters, strides=(1, 1), init=C.xavier(0.0005),dilation=1, bias=False):
    r = Convolution2D(filter_size, num_filters, activation=leaky_relu, init=init, pad=True, strides=strides,dilation=dilation, bias=bias)(input)
    return r


def resnet_basic(input, num_filters,dilation=1):
    c1 = conv_bn_relu(input, (3, 3), num_filters,dilation=dilation, bias=False)
    c2 = Convolution2D((3,3), num_filters, activation=None, pad=True, strides=1,init=C.xavier(0.0005),dilation=dilation, bias=False)(c1)
    c2 = BatchNormalization()(c2)
    p = 0.2*c2 + input
    return relu(p)


#生成器
def generator(z):
    x = Dense([256, img_h // 16, img_w // 16], activation=None, init=xavier(0.02), bias=False)(z) 
    x = pixel_shuffle(x,2)
    x = conv_bn_relu(x,(5, 5),256)

    x = pixel_shuffle(x,2)
    x = conv_bn_relu(x,(5, 5),128)
    x = resnet_basic(x,128)
    x = conv_bn_relu(x,(3, 3),128)
    
    x = pixel_shuffle(x,2)
    x = conv_bn_relu(x,(3, 3),128)
    x = resnet_basic(x,128)
    x = conv_bn_relu(x,(3, 3),64)
    
    x = self_attn_block(x,64)
    
    x = pixel_shuffle(x,2)
    x = conv_bn_relu(x,(3, 3),64,dilation=2)
    x = conv_bn_relu(x,(3, 3),64,dilation=4)
    x = conv_bn_relu(x,(3, 3),64)
    #通道數降維為3，使用sigmoid(控制向量介於0~1)
    x = Convolution2D((1,1), 3, activation=tanh, init=xavier(0.0005), pad=True, strides=1, bias=False)(x)
    return x

#鑑別器ˇ
def discriminator(x):
    x = conv_bn_leaky_relu(x,(5, 5), 32, strides=2)
    x = conv_bn_leaky_relu(x,(3, 3), 32, strides=1,dilation=2)

    x = conv_bn_leaky_relu(x,(5, 5), 64, strides=2)
    x = self_attn_block(x,64)
    
    x = conv_bn_leaky_relu(x,(3, 3), 64, strides=1,dilation=4)
    x = conv_bn_leaky_relu(x,(3, 3), 128, strides=1,dilation=8)
    x = dropout(x,0.5)
    x = resnet_basic(x,128)
    x = conv_bn_leaky_relu(x,(3, 3), 128, strides=2)
    x = resnet_basic(x,128)
    x = conv_bn_leaky_relu(x,(3, 3), 128, strides=1)
    x = conv_bn_leaky_relu(x,(1, 1), 64, strides=1)
    x = GlobalAveragePooling()(x)
    x = squeeze(x)
    x = Dense(1, activation=None, bias=False)(x)
    return x

In [None]:
def tile_rgb_images(x, row=2, col=2):
    fig = pylab.gcf()
    fig.set_size_inches(col * 2, row * 2)
    pylab.clf()
    pylab.ioff()
    for m in range(row * col):
        pylab.subplot(row, col, m + 1)
        img = array2img(x[m]*127.5+127.5)
        pylab.imshow(img, interpolation="nearest", animated=True)
        pylab.axis("off")
    filename='Results/RaSGAN_{}.png'.format(
    str(datetime.datetime.fromtimestamp(time.time())).replace(' ', '').replace(':', '').replace('-', '').replace(
        '.', ''))
    pylab.savefig(filename, bbox_inches='tight')
    plt.axis('off')
    plt.imshow(img)

In [None]:
minibatch_size = 32
num_minibatches = 50000
lr = 2e-4
epsilon=1e-10

def build_RaSGAN_graph(noise_shape, image_shape, generator, discriminator):
    input_dynamic_axes = [C.Axis.default_batch_axis()]
    Z = C.input_variable(noise_shape, dynamic_axes=input_dynamic_axes)
    X_real = C.input_variable(image_shape, dynamic_axes=input_dynamic_axes)
    
    #生成器
    X_fake = generator(Z)
    if os.path.exists('Models\RaSGAN_X_fake_64.cnn'):
        generator = C.Function.load('Models\RaSGAN_X_fake_64.cnn')
        X_fake = generator(Z)
        print("Loading existing X_fake")

    #鑑別器(真圖片)
    D_real = discriminator(X_real)
    if os.path.exists('Models\RaSGAN_D_real_64.cnn'):
        D_real = C.Function.load('Models\RaSGAN_D_real_64.cnn')
        D_real = D_real(X_real)
        print("Loading existing D_real")
    
    #鑑別器(假圖片)
    D_fake = D_real.clone(
        method='share',
        substitutions={X_real: X_fake.output})
    
    clipped_D_params = [C.clip(p, -0.5, 0.5) for p in D_real.parameters]

    
    #設計損失函數
    D_r_tilde = sigmoid(D_real - reduce_mean(D_fake))
    D_f_tilde = sigmoid(D_fake - reduce_mean(D_real))
    D_loss = - reduce_mean(log(D_r_tilde + epsilon)) - reduce_mean(log(1 - D_f_tilde + epsilon))
    G_loss = - reduce_mean(log(D_f_tilde + epsilon)) - reduce_mean(log(1 - D_r_tilde + epsilon))
    
    #D_r_tilde = sigmoid(D_real - reduce_mean(D_fake))
    #D_f_tilde = sigmoid(D_fake - reduce_mean(D_real))
    #D_loss = - reduce_mean(log(D_r_tilde + epsilon)) - reduce_mean(log(1 - D_f_tilde + epsilon))
    #G_loss = - reduce_mean(log(D_f_tilde + epsilon))
    
    
    pp_G = C.logging.ProgressPrinter(50)
    pp_D = C.logging.ProgressPrinter(50)
    G_learner = C.adam(
        parameters=X_fake.parameters,
        lr=C.learning_rate_schedule(lr, C.UnitType.sample),
        momentum= momentum_schedule(0.0),
        l2_regularization_weight=5e-5,
        unit_gain =False,
        use_mean_gradient=True,epsilon=epsilon)

    D_learner = C.adam(
        parameters=D_real.parameters,
        lr=C.learning_rate_schedule(lr, C.UnitType.sample),
        momentum=momentum_schedule(0.9),
        l2_regularization_weight=5e-5, 
        unit_gain =False,
        use_mean_gradient=True,epsilon=epsilon)

    G_trainer = C.Trainer(X_fake,(G_loss, reduce_mean(D_fake)),G_learner,pp_G)
    D_trainer = C.Trainer(D_real,(D_loss, reduce_mean(D_real)),D_learner,pp_D)

    return X_real, X_fake, D_real,clipped_D_params, Z, G_trainer, D_trainer

In [None]:
X_real, X_fake, D_real,clipped_D_params, Z, G_trainer, D_trainer = build_RaSGAN_graph(100, (3, 64, 64), generator, discriminator)
    
print("第一筆是鑑別器損失，metrics為D_real值；第二筆是生成器損失，metrics為D_fake值，兩者都是越大越被判定為真圖片")


for mbs in range(num_minibatches):
    Z_data = noise_sample(minibatch_size)
    X_data = next_minibatch(minibatch_size)

    #梯度懲罰
    #if (mbs+1)>50:
    #    for parameter, clipped in zip(D_real.parameters, clipped_D_params):
    #        C.assign(parameter, clipped).eval()

    D_trainer.train_minibatch({X_real: X_data, Z: Z_data})

    G_trainer.train_minibatch({Z: Z_data,X_real: X_data})


    if (mbs<10000 and (mbs+1) % 20 == 0) or (mbs>=10000 and(mbs+1) % 50 == 0) :
        tile_rgb_images(X_fake(Z_data), 4, 4)
        X_fake.save('Models\RaSGAN_X_fake_64.cnn')
        D_real.save('Models\RaSGAN_D_real_64.cnn')
        
               
    #if (mbs+1) % 5000 == 0 :#每隔1000，學習速率變為75%
    #    lr*=0.75
                
 