<a href="https://colab.research.google.com/github/huytranvan2010/WGAN/blob/main/WGAN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Implement WGAN theo cách weight clipping trên bộ dữ liệu MNIST. Ở đây chỉ làm cho chữ số 7 để quá trình train nhanh hơn, có thể train cho tất cả tập dữ liệu và kiểm tra xem mode collapse có bị xảy ra không.

In [1]:
# import các thư việ cần thiết
import numpy as np
from tensorflow.keras.datasets import mnist
import tensorflow as tf
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Reshape, Flatten, Conv2D, Conv2DTranspose, LeakyReLU, BatchNormalization
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.constraints import Constraint
import matplotlib.pyplot as plt

# xây dựng class kế thừa từ Constraint cho weight clipping
class ClipConstraint(Constraint):
	# set clip value
	def __init__(self, clip_value):
		self.clip_value = clip_value

	# clip weights
	def __call__(self, weights):
		return tf.clip_by_value(weights, -self.clip_value, self.clip_value)

	# trả về config
	def get_config(self):
		return {'clip_value': self.clip_value}

# định nghĩa wasserstein loss
def wasserstein_loss(y_true, y_pred):
	return tf.reduce_mean(y_true * y_pred)

# định nghĩa critic model
def define_critic(in_shape=(28,28,1)):
	# khởi tạo weights
	init = RandomNormal(stddev=0.02)
	# weight constraint
	const = ClipConstraint(0.01)
	#  model
	model = Sequential()
 
	# giảm xuống 14x14
	model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, kernel_constraint=const, input_shape=in_shape))
	model.add(BatchNormalization())
	model.add(LeakyReLU(alpha=0.2))
 
	# giảm xuống 7x7
	model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, kernel_constraint=const))
	model.add(BatchNormalization())
	model.add(LeakyReLU(alpha=0.2))
 
	# linear activation
	model.add(Flatten())
	model.add(Dense(1))
 
	# compile model
	opt = RMSprop(learning_rate=0.00005)
	model.compile(loss=wasserstein_loss, optimizer=opt)
	return model

# định nghĩa generator model
def define_generator(latent_dim):
	# khởi tạo weight
	init = RandomNormal(stddev=0.02)
	# define model
	model = Sequential()
 
	# foundation for 7x7 image
	n_nodes = 128 * 7 * 7
	model.add(Dense(n_nodes, kernel_initializer=init, input_dim=latent_dim))
	model.add(LeakyReLU(alpha=0.2))
	model.add(Reshape((7, 7, 128)))
 
	# tăng kích thước lên 14x14
	model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
	model.add(BatchNormalization())
	model.add(LeakyReLU(alpha=0.2))
 
	# tăng kích thước lên 28x28
	model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
	model.add(BatchNormalization())
	model.add(LeakyReLU(alpha=0.2))
 
	# output 28x28x1 (dùng Conv layer với 1 kernel)
	model.add(Conv2D(1, (7,7), activation='tanh', padding='same', kernel_initializer=init))
	return model

# định nghĩa gan - để update generator
def define_gan(generator, critic):
	# weights của critic bị freeze
	for layer in critic.layers:
		if not isinstance(layer, BatchNormalization):
			layer.trainable = False
	model = Sequential()
	# add generator
	model.add(generator)
	# add the critic
	model.add(critic)
	# compile model
	opt = RMSprop(learning_rate=0.00005)
	model.compile(loss=wasserstein_loss, optimizer=opt)
	return model

# load ảnh thật
def load_real_samples():
	# load dataset
	(trainX, trainy), (_, _) = mnist.load_data()
	# chọn indicies có nhãn là 7
	selected_ix = (trainy == 7)     # trả về array True, False
	X = trainX[selected_ix]         # lấy vị trí của selected_ix = True
	# mở rộng chiều về cuối - phù hợp cho Conv2D
	X = np.expand_dims(X, axis=-1)
	# chuyển từ uint8 => float
	X = X.astype('float32')
	# scale từ [0,255] về [-1,1] - layer cuối của generator dùng tanh
	X = (X - 127.5) / 127.5
	return X

# chọn ảnh thật vào minibatch
def generate_real_samples(dataset, n_samples):
	# chọn ngẫu nhiên n_samples indicies
	ix = np.random.randint(0, dataset.shape[0], n_samples)
	# lấy ảnh images
	X = dataset[ix]
	# tạo label = -1 for cho ảnh thật
	y = - np.ones((n_samples, 1))
	return X, y

# tạo points từ latent space làm đầu vào cho generator
def generate_latent_points(latent_dim, n_samples):
	# tạo n_samples points cho minibatch
	x_input = np.random.randn(n_samples, latent_dim)
	return x_input

# sử dụng points trong latent space để tạo ảnh fake
def generate_fake_samples(generator, latent_dim, n_samples):
	# tạo points trong latent space
	x_input = generate_latent_points(latent_dim, n_samples)
	# output của generator - ảnh fake
	X = generator.predict(x_input)
	# tạo labels = 1 cho ảnh fake
	y = np.ones((n_samples, 1))
	return X, y

# sinh dữ liệu and save as a plot and save the model
def summarize_performance(step, g_model, latent_dim, n_samples=100):
	# prepare fake examples
	X, _ = generate_fake_samples(g_model, latent_dim, n_samples)
	# scale from [-1,1] to [0,1]
	X = (X + 1) / 2.0
	# plot images
	for i in range(10 * 10):    # in 100 ảnh
		# định nghĩa subplot
		plt.subplot(10, 10, 1 + i)
		# turn off axis
		plt.axis('off')
		# plot raw pixel data
		plt.imshow(X[i, :, :, 0], cmap='gray_r')    # đầu ra là tensor 4 chiều, mình chỉ cần 2 chiều cho mỗi ảnh, chỉ số cuối = 0 do có 1 channel
	# save plot to file
	filename1 = 'generated_plot_%04d.png' % (step+1)
	plt.savefig(filename1)
	plt.close()
	# save the generator model
	filename2 = 'model_%04d.h5' % (step+1)
	g_model.save(filename2)
	print('>Saved: %s and %s' % (filename1, filename2))

# create a line plot of loss for the gan and save to file
def plot_history(d1_hist, d2_hist, g_hist):
	# plot history
	plt.plot(d1_hist, label='crit_real')
	plt.plot(d2_hist, label='crit_fake')
	plt.plot(g_hist, label='gen')
	plt.legend()
	plt.savefig('plot_line_plot_loss.png')
	plt.close()

# train the generator and critic
def train(g_model, c_model, gan_model, dataset, latent_dim, n_epochs=10, n_batch=64, n_critic=5):
	# số lượng batches cho mỗi epoch
	batch_per_epoch = dataset.shape[0] // n_batch
	# số lượng training iterations
	n_steps = batch_per_epoch * n_epochs
	# kích thước của nửa batch
	half_batch = n_batch // 2
	# tạo empty lists để lưu các losses
	c1_hist, c2_hist, g_hist = list(), list(), list()
	# duyệt qua training iterations
	for i in range(n_steps):
		# CẬP NHẬT critic
		c1_tmp, c2_tmp = list(), list()
		for _ in range(n_critic):
			# lấy ảnh thật
			X_real, y_real = generate_real_samples(dataset, half_batch)
			# update critic weights
			c_loss1 = c_model.train_on_batch(X_real, y_real)    # do ở trên ko định nghĩa metrics nên trin_on_batch chỉ trả về loss
			c1_tmp.append(c_loss1)
			# tạo ảnh fake
			X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
			# update critic weights
			c_loss2 = c_model.train_on_batch(X_fake, y_fake)
			c2_tmp.append(c_loss2)
   
		# lưu critic loss
		c1_hist.append(np.mean(c1_tmp))     # lấy trung bình cho n_critic lần
		c2_hist.append(np.mean(c2_tmp))
  
        # CẬP NHẬT generator
		# tạo points trong latent space
		X_gan = generate_latent_points(latent_dim, n_batch)
		# gán nhãn cho ảnh fake tại đây là -1 do đang muốn generator tạo ra ảnh giống thật nhất (gán nhãn giống ảnh thật)
		y_gan = - np.ones((n_batch, 1))
		# cập nhật generator thông qua critic 
		g_loss = gan_model.train_on_batch(X_gan, y_gan)
        # lưu lại loss
		g_hist.append(g_loss)
  
		# in ra các loss
		print('>%d, c1=%.3f, c2=%.3f g=%.3f' % (i+1, c1_hist[-1], c2_hist[-1], g_loss))
		# đánh giá model sau mỗi "epoch" (thực hiện được số batches trong 1 epoch)
		if (i+1) % batch_per_epoch == 0:
			summarize_performance(i, g_model, latent_dim)
	# line plots of loss
	plot_history(c1_hist, c2_hist, g_hist)

# số chiều của points trong latent space
latent_dim = 50
# Tạo critic
critic = define_critic()
# Tạo generator
generator = define_generator(latent_dim)
# tạo gan
gan_model = define_gan(generator, critic)
# load ảnh thật
dataset = load_real_samples()
print(dataset.shape)
# train model
train(generator, critic, gan_model, dataset, latent_dim)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
(6265, 28, 28, 1)
>1, c1=-2.476, c2=-0.011 g=0.250
>2, c1=-6.971, c2=0.075 g=-1.079
>3, c1=-9.931, c2=0.158 g=-2.317
>4, c1=-12.631, c2=0.227 g=-3.253
>5, c1=-14.191, c2=0.309 g=-4.490
>6, c1=-16.575, c2=0.371 g=-5.054
>7, c1=-17.585, c2=0.448 g=-5.845
>8, c1=-18.548, c2=0.527 g=-6.841
>9, c1=-20.347, c2=0.575 g=-7.947
>10, c1=-21.509, c2=0.648 g=-8.445
>11, c1=-22.465, c2=0.725 g=-9.050
>12, c1=-23.194, c2=0.767 g=-10.263
>13, c1=-24.587, c2=0.875 g=-10.956
>14, c1=-25.302, c2=0.938 g=-11.672
>15, c1=-26.249, c2=1.029 g=-12.786
>16, c1=-26.067, c2=1.138 g=-13.969
>17, c1=-27.435, c2=1.215 g=-14.983
>18, c1=-27.844, c2=1.310 g=-15.024
>19, c1=-28.405, c2=1.400 g=-15.952
>20, c1=-28.853, c2=1.440 g=-16.817
>21, c1=-28.871, c2=1.515 g=-17.759
>22, c1=-30.255, c2=1.552 g=-18.183
>23, c1=-30.413, c2=1.598 g=-18.772
>24, c1=-31.355, c2=1.577 g=-19.463
>25, c1=-31.304, c2=1.579 g=-19.667
>26, c1=-32.5