# Use This Link to get into the runned notebook with attached results.
https://www.kaggle.com/mohammednamory/stegcycgans/notebook

# 0. Import Libraries

In [1]:
!pip install git+https://www.github.com/keras-team/keras-contrib.git

In [2]:
from random import random
from numpy import load
from numpy import zeros
from numpy import ones
from numpy import asarray
from numpy.random import randint
from tensorflow.keras.optimizers import Adam
from keras.initializers import RandomNormal
from keras.models import Model
from keras.models import Input
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Activation
from keras.layers import Concatenate
from keras_contrib.layers.normalization.instancenormalization import InstanceNormalization
from matplotlib import pyplot

# example of using saved cyclegan models for image translation
from keras.models import load_model

We have 2 generators and 2 discriminators corresponding to each domain of images (could be the real images and the stegano one; re-check the paper to know what are your 2 domains of images)

# 1.\
Domain-A -> Discriminator-A -> [Real/Fake] \
Domain-B -> Generator-A -> Discriminator-A -> [Real/Fake]

Domain-B -> Discriminator-B -> [Real/Fake]\
Domain-A -> Generator-B -> Discriminator-B -> [Real/Fake]

#2.\
Cycle Consistency \
((Genertor A: means it generate sth as domain A, but takes input as anything except domain A)) \
**Domain-B** -> **Generator-A** -> **Domain-A** -> Generator-B -> Domain-B
#3.\
Identity Mapping (result in better color profile)

# 1. Load the images [train+test; 2 domains) into two arrays then compress the 2 arrays in one compressed array 

In [3]:
# example of preparing the horses and zebra dataset
from os import listdir
from numpy import asarray
from numpy import vstack
from keras.preprocessing.image import img_to_array
from keras.preprocessing.image import load_img
from numpy import savez_compressed
import pandas as pd
from PIL import Image, ImageDraw
import random as rd
import numpy as np

In [4]:
# load all images in a directory into memory
def load_images(path, size=(256,256)):
	data_list = list()
	# enumerate filenames in directory, assume all are images
	for filename in listdir(path):
		# load and resize the image
		pixels = load_img(path + filename, target_size=size)
		# convert to numpy array
		pixels = img_to_array(pixels)
		# store
		data_list.append(pixels)
	return asarray(data_list)

# dataset path
path = '../input/massachusetts-buildings-dataset/png/'

# load dataset A
dataA1 = load_images(path + 'train/')
dataAB = load_images(path + 'test/')
dataA = vstack((dataA1, dataAB))
print('Loaded dataA: ', dataA.shape)

# load dataset B
# dataB1 = load_images(path + 'trainB/')
# dataB2 = load_images(path + 'testB/')
# dataB = vstack((dataB1, dataB2))
# print('Loaded dataB: ', dataB.shape)

# save as compressed numpy array (Both domains; train and test)
# filename = '/content/drive/MyDrive/noise2text/noise2text_256.npz'
# savez_compressed(filename, dataA, dataB)
# print('Saved dataset: ', filename)

## 1.1. Load text files and convert to images

In [5]:
dataset = pd.read_csv("../input/text-trail/tamil_thirukkural_train.csv")
sub_df= dataset.iloc[0]

In [6]:
text_img =[]
Rand_number_for_color = randint(0, 255)
for i in range(0,137):
  img = Image.new('RGB', (256, 256), color = (Rand_number_for_color, Rand_number_for_color, Rand_number_for_color))
  d = ImageDraw.Draw(img)
  d.text((5,5), dataset.at[i,'explanation'], fill=(255,255,0))
  n = np.array(img)
  text_img.append(n)



In [7]:
text_img_test =[]
for i in range(137,147):
  img = Image.new('RGB', (256, 256), color = (Rand_number_for_color, Rand_number_for_color, Rand_number_for_color))
  d = ImageDraw.Draw(img)
  d.text((5,5), dataset.at[i,'explanation'], fill=(255,255,0))
  n = np.array(img)
  text_img_test.append(n)


In [8]:
list_of_gen_img_train = np.array(text_img)
list_of_gen_img_test = np.array(text_img_test)

## Compress together

In [9]:
dataA = vstack((dataA1, dataAB))
dataB = vstack((list_of_gen_img_train, list_of_gen_img_test))

In [10]:
filename = 'CS_text.npz'
savez_compressed(filename, dataA, dataB)
print('Saved dataset: ', filename)

## 1.2. Load the compressed images and plot

In [13]:
# load and plot the prepared dataset
from numpy import load
from matplotlib import pyplot

# load the dataset
data = load('CS_text.npz')
dataA, dataB = data['arr_0'], data['arr_1']
print('Loaded: ', dataA.shape, dataB.shape)

# plot source images
n_samples = 3
for i in range(n_samples):
	pyplot.subplot(3, n_samples, 1 + i)
	pyplot.axis('off')
	pyplot.imshow(dataA[i].astype('uint8'))
 
# plot target image
for i in range(n_samples):
	pyplot.subplot(3, n_samples, 1 + n_samples + i)
	pyplot.axis('off')
	pyplot.imshow(dataB[i].astype('uint8'))
pyplot.show()

# 2. Develop Cycle GAN
(Use keras; output -> Color Images of size 256x256) \
- The architecture is comprised of **four** models, **two discriminator** models, and **two generator** models.



## 2.1. Discriminator Model

- **Discriminator** is CNN for **image classification**; one for each domain.

- CycleGAN **discriminator**: `Convolutional-BatchNorm-LeakyReLU` but uses **InstanceNormalization** instead of BatchNormalization; standardizing the values on **each output feature map**, rather than across features in a batch. Instance norm has implementation in [keras contrib project](https://github.com/keras-team/keras-contrib).

- The discriminator model takes a 256×256 sized image as input and **outputs** a **patch of predictions 70x70**

In [14]:
# define the discriminator model
def define_discriminator(image_shape):

	# weight initialization
	init = RandomNormal(stddev=0.02)
	# source image input
	in_image = Input(shape=image_shape)
	# C64
	d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(in_image)
	d = LeakyReLU(alpha=0.2)(d)
	# C128
	d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
  # “axis” argument is set to -1 to ensure that features are normalized per feature map.
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C256
	d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# C512
	d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# second last output layer
	d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
	d = InstanceNormalization(axis=-1)(d)
	d = LeakyReLU(alpha=0.2)(d)
	# patch output
	patch_out = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
	# define model
	model = Model(in_image, patch_out)
	# compile model
	model.compile(loss='mse', optimizer=Adam(lr=0.0002, beta_1=0.5), loss_weights=[0.5])
 
	return model

## 2.2. Generator Model
- Generator is an **encoder-decoder** model.
- Model takes **source image** (domain A) and generates **target image** (domain B).
1. Downsample (encode) input (by bottleNeck)
2. Interpret this by **ResNet**, then series of **upsample** or decode to return back to original size.

### ResNet: Used to solve the problem of the DL models having exploding or vanishing gradients.

In [15]:
# Generator: ResNet block
#### Probably we can ignore this
def resnet_block(n_filters, input_layer):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# first layer convolutional layer
	g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(input_layer)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# second convolutional layer
	g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	# concatenate merge channel-wise with input layer
	g = Concatenate()([g, input_layer])
	return g

In [16]:
### 9-ResNet blocks for 256x256 images for generator

# define the standalone generator model
def define_generator(image_shape, n_resnet=9):
	# weight initialization
	init = RandomNormal(stddev=0.02)
	# image input
	in_image = Input(shape=image_shape)
	# c7s1-64
	g = Conv2D(64, (7,7), padding='same', kernel_initializer=init)(in_image)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# d128
	g = Conv2D(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# d256
	g = Conv2D(256, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# R256
	for _ in range(n_resnet):
		g = resnet_block(256, g)
	# u128
	g = Conv2DTranspose(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# u64
	g = Conv2DTranspose(64, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	g = Activation('relu')(g)
	# c7s1-3
	g = Conv2D(3, (7,7), padding='same', kernel_initializer=init)(g)
	g = InstanceNormalization(axis=-1)(g)
	out_image = Activation('tanh')(g)
	# define model
	model = Model(in_image, out_image)
	return model

- **Discriminator** models are trained directly on **real and generated** images.
- Generator models are trained via their discriminator models to **minimize** the **adversarial loss** [For discriminator decibt]. (**L2 distance** between model out and target values '1: real', '0: fake')
- Also, generators have "**cycle loss**" [For regeneration]. (2 losses: Fwd + Bwd)
- Finally, **Identity loss** [Maintain same image if it's originally from the target domain] (1 Loss)

== Both **cycle and Identity loss** are **L1 distance ** between input and output image for each translation \
== A composite model is built to allow sharing the weights of the generator with the related discriminator. \
- The composite model has 2 inputs (images from A and B) and 4 outputs (discriminator out, identity generated image, forward-cycle generated img, bwd cycle img)




In [17]:
# define a composite model for updating generators by adversarial and cycle loss
def define_composite_model(g_model_1, d_model, g_model_2, image_shape):
	# ensure the model we're updating is trainable
	g_model_1.trainable = True
	# mark discriminator as not trainable
	d_model.trainable = False
	# mark other generator model as not trainable
	g_model_2.trainable = False
	# discriminator element
	input_gen = Input(shape=image_shape)
	gen1_out = g_model_1(input_gen)
	output_d = d_model(gen1_out)
	# identity element
	input_id = Input(shape=image_shape)
	output_id = g_model_1(input_id)
	# forward cycle
	output_f = g_model_2(gen1_out)
	# backward cycle
	gen2_out = g_model_2(input_id)
	output_b = g_model_1(gen2_out)
	# define model graph
	model = Model([input_gen, input_id], [output_d, output_id, output_f, output_b])
	# define optimization algorithm configuration
	opt = Adam(lr=0.0002, beta_1=0.5)
	# compile model with weighting of least squares loss and L1 loss
	model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=opt)
	return model

We need to create a composite model for each generator: Generator-A (input: B, out: A) and Generator B. \
- Here: A -> Horse \

**Generator-A Composite Model** (BtoA or Zebra to Horse)

The inputs, transformations, and outputs of the model are as follows:

**Adversarial Loss**: Domain-B -> Generator-A -> Domain-A -> Discriminator-A -> [real/fake]
**Identity Loss**: Domain-A -> Generator-A -> Domain-A
**Forward Cycle Loss**: Domain-B -> Generator-A -> Domain-A -> Generator-B -> Domain-B
**Backward Cycle Loss**: Domain-A -> Generator-B -> Domain-B -> Generator-A -> Domain-A
We can summarize the inputs and outputs as:

Inputs: Domain-B, Domain-A
Outputs: Real/Fake, Domain-A, Domain-B, Domain-A


# 3. Load the images 
### As a list of 2 numpy array (coming from the compressed NPZ) [1st source image, 2nd target image]

In [18]:
# load and prepare training images
def load_real_samples(filename):
	# load the dataset
	data = load(filename)
	# unpack arrays
	X1, X2 = data['arr_0'], data['arr_1']
	# scale from [0,255] to [-1,1]
	X1 = (X1 - 127.5) / 127.5
	X2 = (X2 - 127.5) / 127.5
	return [X1, X2]

## 3.1. Selecting Random batches of images from each domain for the discriminator and composite generator

In [19]:
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
	# choose random instances
	ix = randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, patch_shape, patch_shape, 1))
	return X, y

## 3.2. Select Sample of generated images to update the discriminator

In [20]:
# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset, patch_shape):
	# generate fake instance
	X = g_model.predict(dataset)
	# create 'fake' class labels (0)
	y = zeros((len(X), patch_shape, patch_shape, 1))
	return X, y

Note that GANs don't converge rather they reach an **equilibrium** state, thus we can decide when to **stop** (epochs) based on the **generated image quality** (save generator each n epochs and generate with it)

# 4. Model performance
## 4.1. Save generator models each epoch

In [21]:
# save the generator models to file
def save_models(step, g_model_AtoB, g_model_BtoA):
	# save the first generator model
	filename1 = 'g_model_AtoB_%06d.h5' % (step+1)
	g_model_AtoB.save(filename1)
	# save the second generator model
	filename2 = 'g_model_BtoA_%06d.h5' % (step+1)
	g_model_BtoA.save(filename2)
	print('>Saved: %s and %s' % (filename1, filename2))

## 4.2. Plot Sample of source images (1st row) against the generated images (2nd row)

In [22]:
# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, trainX, name, n_samples=5):
	# select a sample of input images
	X_in, _ = generate_real_samples(trainX, n_samples, 0)
	# generate translated images
	X_out, _ = generate_fake_samples(g_model, X_in, 0)
	# scale all pixels from [-1,1] to [0,1]
	X_in = (X_in + 1) / 2.0
	X_out = (X_out + 1) / 2.0
	# plot real images
	for i in range(n_samples):
		pyplot.subplot(2, n_samples, 1 + i)
		pyplot.axis('off')
		pyplot.imshow(X_in[i])
	# plot translated image
	for i in range(n_samples):
		pyplot.subplot(2, n_samples, 1 + n_samples + i)
		pyplot.axis('off')
		pyplot.imshow(X_out[i])
	# save plot to file
	filename1 = '%s_generated_plot_%06d.png' % (name, (step+1))
	pyplot.savefig(filename1)
	pyplot.close()

#### Note: Image Pool for discriminator

In [23]:
# update image pool for fake images
def update_image_pool(pool, images, max_size=50):
	selected = list()
	for image in images:
		if len(pool) < max_size:
			# stock the pool
			pool.append(image)
			selected.append(image)
		elif random() < 0.5:
			# use image, but don't add it to the pool
			selected.append(image)
		else:
			# replace an existing image and use replaced image
			ix = randint(0, len(pool))
			selected.append(pool[ix])
			pool[ix] = image
	return asarray(selected)

# 5. Train Models
The batch size is 1, so the number of iterations in each epoch. Models are saved every 5 epochs.

Images are generated using both generators each epoch.

Batch of real images from each domain is selected then a batch of fake images is generated to update each discriminator's fake image pool.

Generator-A is updated by the composite model then by discriminator-A.

In [24]:
# train cyclegan models
### 2 generator, 2 discriminator, 2 composite
def train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset):
	# define properties of the training run
  #### epochs: changed to 10 to fasten the work a bit
	n_epochs, n_batch, = 100, 1
	# determine the output square shape of the discriminator
	n_patch = d_model_A.output_shape[1]
	# unpack dataset
	trainA, trainB = dataset
	# prepare image pool for fakes
	poolA, poolB = list(), list()
	# calculate the number of batches per training epoch
	bat_per_epo = int(len(trainA) / n_batch)
	# calculate the number of training iterations
	n_steps = bat_per_epo * n_epochs
	# manually enumerate epochs
	for i in range(n_steps):
		# select a batch of real samples
		X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
		X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
		# generate a batch of fake samples
		X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
		X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
		# update fakes from pool
		X_fakeA = update_image_pool(poolA, X_fakeA)
		X_fakeB = update_image_pool(poolB, X_fakeB)
		# update generator B->A via adversarial and cycle loss
		g_loss2, _, _, _, _  = c_model_BtoA.train_on_batch([X_realB, X_realA], [y_realA, X_realA, X_realB, X_realA])
		# update discriminator for A -> [real/fake]
		dA_loss1 = d_model_A.train_on_batch(X_realA, y_realA)
		dA_loss2 = d_model_A.train_on_batch(X_fakeA, y_fakeA)
		# update generator A->B via adversarial and cycle loss
		g_loss1, _, _, _, _ = c_model_AtoB.train_on_batch([X_realA, X_realB], [y_realB, X_realB, X_realA, X_realB])
		# update discriminator for B -> [real/fake]
		dB_loss1 = d_model_B.train_on_batch(X_realB, y_realB)
		dB_loss2 = d_model_B.train_on_batch(X_fakeB, y_fakeB)
		# summarize performance
		print('>%d, dA[%.3f,%.3f] dB[%.3f,%.3f] g[%.3f,%.3f]' % (i+1, dA_loss1,dA_loss2, dB_loss1,dB_loss2, g_loss1,g_loss2))
		# evaluate the model performance every so often
		if (i+1) % (bat_per_epo * 1) == 0:
			# plot A->B translation
			summarize_performance(i, g_model_AtoB, trainA, 'AtoB')
			# plot B->A translation
			summarize_performance(i, g_model_BtoA, trainB, 'BtoA')
		if (i+1) % (bat_per_epo * 5) == 0:
			# save the models
			save_models(i, g_model_AtoB, g_model_BtoA)

In [25]:
# load image data
dataset = load_real_samples('CS_text.npz')

print('Loaded', dataset[0].shape, dataset[1].shape)

# define input shape based on the loaded dataset
image_shape = dataset[0].shape[1:]

# generator: A -> B
g_model_AtoB = define_generator(image_shape)
# generator: B -> A
g_model_BtoA = define_generator(image_shape)

# discriminator: A -> [real/fake]
d_model_A = define_discriminator(image_shape)
# discriminator: B -> [real/fake]
d_model_B = define_discriminator(image_shape)

# composite: A -> B -> [real/fake, A]
c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
# composite: B -> A -> [real/fake, B]
c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)

# train models
train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset)



**Trials**
1. For 5 epochs and (37, 38 train images) => no change in the images just bluring or little color distortion.
2. For 100 epochs and same number of images => just makes distortions in the images but not generate anything (we need large number of dataset)

===> If this didn't work too then we must provide some sort of large dataset

# 6. Image Translation with Cycle-GAN (Example)

In [26]:
# select a random sample of images from the dataset
def select_sample(dataset, n_samples):
	# choose random instances
	ix = randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	return X

In [27]:
# plot the image, the translation, and the reconstruction
def show_plot(imagesX, imagesY1, imagesY2):
	images = vstack((imagesX, imagesY1, imagesY2))
	titles = ['Real', 'Generated', 'Reconstructed']
	# scale from [-1,1] to [0,1]
	images = (images + 1) / 2.0
	# plot images row by row
	for i in range(len(images)):
		# define subplot
		pyplot.subplot(1, len(images), 1 + i)
		# turn off axis
		pyplot.axis('off')
		# plot raw pixel data
		pyplot.imshow(images[i])
		# title
		pyplot.title(titles[i])
	pyplot.show()

In [34]:
# load dataset
A_data, B_data = load_real_samples('./CS_text.npz')
print('Loaded', A_data.shape, B_data.shape)

In [32]:
#### Specify how to load the InstanceNormalization layer while loading the genenrator model
# load the models
## Take the generators at epoch 7 
cust = {'InstanceNormalization': InstanceNormalization}
model_AtoB = load_model('./g_model_AtoB_014700.h5', cust)
model_BtoA = load_model('./g_model_BtoA_014700.h5', cust)

In [35]:
## Select image from Domain-A, convert it to B by generator AtoB
## then reconstruct it to A again by Generator BtoA

# plot A->B->A
A_real = select_sample(A_data, 1)
B_generated  = model_AtoB.predict(A_real)
A_reconstructed = model_BtoA.predict(B_generated)
show_plot(A_real, B_generated, A_reconstructed)

In [36]:
# plot B->A->B
B_real = select_sample(B_data, 1)
A_generated  = model_BtoA.predict(B_real)
B_reconstructed = model_AtoB.predict(A_generated)
show_plot(B_real, A_generated, B_reconstructed)


# 7. Beneficial Extensions (General)

- Smaller Image Size. Update the example to use a smaller image size, such as 128×128
- Adjust the size of the generator model to use 6 ResNet layers as is used in the cycleGAN paper.
- Different Dataset. Update the example to use the apples to oranges dataset.
- Without Identity Mapping. Update the example to train the generator models without the identity mapping and compare results.