In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import os
os.chdir('/content/drive/MyDrive/university/introduction to AI')

# Library Imports

In [None]:
import numpy as np
import os
import math
import csv
import matplotlib.pyplot as plt
import cv2

import sklearn
from sklearn.preprocessing import MinMaxScaler

# imports keras
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense, Dropout, Activation, Permute, Reshape
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, LeakyReLU, ZeroPadding2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D, Conv2DTranspose, UpSampling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import SpatialDropout2D
from tensorflow.keras.layers import Flatten, Input
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.constraints import max_norm
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tqdm import tqdm

# Global Variable

In [None]:
img_shape = (64, 64)
tf.keras.utils.set_random_seed(1)
batch_size = 64
latent_dim = 100

# Preprocess The Data


## Load the bounding boxes

In [None]:
IMAGES_BOX_PATH = 'fgvc-aircraft-2013b/data/images_box.txt'
# dictionary of img_name as key and all other infos as values
image_infos = {}
with open(IMAGES_BOX_PATH) as f:
  for line in f.readlines():
    img_name, xmin, ymin, xmax, ymax = line.rstrip('\n').split(' ')
    image_infos[img_name] = {'xmin': int(xmin), 'ymin': int(ymin), 'xmax': int(xmax), 'ymax': int(ymax), 'tensor': np.array([])}

next(iter(image_infos.items()))

## Load the Images

In [None]:
IMAGES_PATH = 'fgvc-aircraft-2013b/data/images'

for idx, filename in tqdm(enumerate(os.listdir(IMAGES_PATH))):
  # plt.figure()
  img = cv2.imread(os.path.join(IMAGES_PATH, filename))
  img_name = filename.split('.')[0]
  xmin = image_infos[img_name]['xmin']
  ymin = image_infos[img_name]['ymin']
  xmax = image_infos[img_name]['xmax']
  ymax = image_infos[img_name]['ymax']
  
  cropped_img = img[ymin: ymax, xmin: xmax]
  cropped_img = cv2.resize(cropped_img, img_shape)
  
  image_infos[img_name]['tensor'] = np.array(cropped_img)


## Load train, val, test data for family, manufacturer, variant into data structure

In [None]:
IMAGES_FAMILY_PATH_TRAIN = 'fgvc-aircraft-2013b/data/images_family_train.txt'
IMAGES_FAMILY_PATH_VAL = 'fgvc-aircraft-2013b/data/images_family_val.txt'
IMAGES_FAMILY_PATH_TEST = 'fgvc-aircraft-2013b/data/images_family_test.txt'

IMAGES_MANUFACTURER_PATH_TRAIN = 'fgvc-aircraft-2013b/data/images_manufacturer_train.txt'
IMAGES_MANUFACTURER_PATH_VAL = 'fgvc-aircraft-2013b/data/images_manufacturer_val.txt'
IMAGES_MANUFACTURER_PATH_TEST = 'fgvc-aircraft-2013b/data/images_manufacturer_test.txt'

IMAGES_VARIANT_PATH_TRAIN = 'fgvc-aircraft-2013b/data/images_variant_train.txt'
IMAGES_VARIANT_PATH_VAL = 'fgvc-aircraft-2013b/data/images_variant_val.txt'
IMAGES_VARIANT_PATH_TEST = 'fgvc-aircraft-2013b/data/images_variant_test.txt'

IMAGES_PATH_TRAIN = 'fgvc-aircraft-2013b/data/images_train.txt'
IMAGES_PATH_VAL = 'fgvc-aircraft-2013b/data/images_val.txt'
IMAGES_PATH_TEST = 'fgvc-aircraft-2013b/data/images_test.txt'

In [None]:
def load_spec(path, data_type = 'train', spec = 'family'):
  """
    Loads the family data and store it to a dictionary of a dictionary (defined in global) containing information of the data

    input
      path : the path for the dataset information in txt
      data_type : a string value of either 'train', 'val', or 'test'
      spec : a string value of either 'family', 'manufacturer' or 'variant'
  
  """
  with open(path) as f:
    for line in f.readlines():
      line_list = line.rstrip('\n').split(' ')
      # raise exception if line_list has only len = 1
      img_name, spec_data  = line_list[0], ' '.join(line_list[i] for i in range(1, len(line_list)))
      # adds new element in images_infos dictionary
      image_infos[img_name][spec] = spec_data
      image_infos[img_name]['data_type'] = data_type

# load family train, val, test
load_spec(IMAGES_FAMILY_PATH_TRAIN, 'train', 'family')
load_spec(IMAGES_FAMILY_PATH_VAL, 'val', 'family')
load_spec(IMAGES_FAMILY_PATH_TEST, 'test', 'family')
# load manufacturer train, val, test
load_spec(IMAGES_MANUFACTURER_PATH_TRAIN, 'train', 'manufacturer')
load_spec(IMAGES_MANUFACTURER_PATH_VAL, 'val', 'manufacturer')
load_spec(IMAGES_MANUFACTURER_PATH_TEST, 'test', 'manufacturer')
# load variant train, val, test
load_spec(IMAGES_VARIANT_PATH_TRAIN, 'train', 'variant')
load_spec(IMAGES_VARIANT_PATH_VAL, 'val', 'variant')
load_spec(IMAGES_VARIANT_PATH_TEST, 'test', 'variant')

next(iter(image_infos.items()))

## Preprocess Data

### Preprocess X data

In [None]:
X_train = []

with open(IMAGES_PATH_TRAIN) as f:
  for line in f.readlines():
    img_name = line.rstrip('\n')
    if (np.any(image_infos[img_name]['tensor'])):
      X_train.append(image_infos[img_name]['tensor'])

with open(IMAGES_PATH_VAL) as f:
  for line in f.readlines():
    img_name = line.rstrip('\n')
    if (np.any(image_infos[img_name]['tensor'])):
      X_train.append(image_infos[img_name]['tensor'])

X_train = np.array(X_train)
X_train = (X_train - 127.5) / 127.5
X_train.shape

# Plot the image

In [None]:
plt.figure(figsize=(10, 10))

for i, (key, val) in enumerate(image_infos.items()):
  if i == 25: 
    break
  ax = plt.subplot(5, 5, i + 1)
  if (np.any(val['tensor'])):
    plt.imshow(val['tensor'].astype("uint8"))
  plt.title(key)
  plt.axis("off")

# Build Model


## Generator Model


In [None]:
def define_generator():
  inputs = Input(shape = (latent_dim, ))
  # foundation for 4x4 image
  block1 = Dense(256 * 4 * 4, use_bias = False)(inputs)
  block1 = BatchNormalization()(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)
  block1 = Reshape((4, 4, 256))(block1)
  # upsample to 8x8
  block1 = UpSampling2D((2, 2))(block1)
  block1 = Conv2D(filters=128, kernel_size=(3, 3), strides=(1,1), padding='same', use_bias = False)(block1)
  block1 = BatchNormalization()(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)

  # upsample to 16x16
  block1 = UpSampling2D((2, 2))(block1)
  block1 = Conv2D(filters=64, kernel_size=(3, 3), strides=(1,1), padding='same', use_bias = False)(block1)
  block1 = BatchNormalization()(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)
  # upsample to 32x32
  block1 = UpSampling2D((2, 2))(block1)
  block1 = Conv2D(filters=32, kernel_size=(3, 3), strides=(1,1), padding='same', use_bias = False)(block1)
  block1 = BatchNormalization()(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)
  
  # upsample to 64x64
  block1 = UpSampling2D((2, 2))(block1)
  outputs = Conv2D(filters=3, kernel_size=(3,3), activation='tanh', padding='same', use_bias = False)(block1)
  outputs = BatchNormalization()(outputs)
  return Model(inputs = inputs, outputs = outputs)

In [None]:
gen_model = define_generator()
gen_model.summary()
tf.keras.utils.plot_model(gen_model, show_shapes=True, show_layer_names=True, dpi=50)

In [None]:
noise = tf.random.normal([1, 100])
generated_image = gen_model(noise, training=False)

plt.imshow(generated_image[0, :, :, 0])

## Discriminator Model

In [None]:
def define_discriminator():
  inputs = Input(shape=(img_shape[0], img_shape[1], 3))
  # assert(in_shape == (128, 128, 3))

  x = ZeroPadding2D((2, 2))(inputs)
  # Hidden Layer 1
  block1 = Conv2D(filters=64, kernel_size=(5,5), strides=(2, 2), padding='same')(inputs)
  block1 = LeakyReLU(alpha=0.2)(block1)
  
  # Hidden Layer 2
  block1 = Conv2D(filters=128, kernel_size=(5,5), strides=(2, 2), padding='same')(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)
  block1 = Dropout(0.3)(block1)
  
  # Hidden Layer 3
  block1 = Conv2D(filters=256, kernel_size=(5,5), strides=(2, 2), padding='same')(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)
  block1 = Dropout(0.3)(block1)

  block1 = Conv2D(filters=512, kernel_size=(5,5), strides=(2, 2), padding='same')(block1)
  block1 = LeakyReLU(alpha=0.2)(block1)
  block1 = Dropout(0.3)(block1)
  
  # Flatten and Output Layers
  block1 = Flatten()(block1) # Flatten the shape
  block1 = Dropout(0.2)(block1) # Randomly drop some connections for better generalization

  # outputs layer
  outputs = Dense(1)(block1) # Output Layer
  
  model = Model(inputs = inputs, outputs = outputs, name = 'discriminator')

  return model

# Instantiate
dis_model = define_discriminator()

# Show model summary and plot model diagram
dis_model.summary()
tf.keras.utils.plot_model(dis_model, show_shapes=True, show_layer_names=True, dpi=50)

In [None]:
decision = dis_model(generated_image)
print(decision)

## GAN Model

In [None]:
class WGAN(Model):
  def __init__(
    self,
    discriminator,
    generator,
    latent_dim,
    discriminator_extra_steps=3,
    gp_weight=10.0,
  ):
    super(WGAN, self).__init__()
    self.discriminator = discriminator
    self.generator = generator
    self.latent_dim = latent_dim
    self.d_steps = discriminator_extra_steps
    self.gp_weight = gp_weight

  def compile(self, d_optimizer, g_optimizer, d_loss_fn, g_loss_fn):
    super(WGAN, self).compile()
    self.d_optimizer = d_optimizer
    self.g_optimizer = g_optimizer
    self.d_loss_fn = d_loss_fn
    self.g_loss_fn = g_loss_fn

  def gradient_penalty(self, batch_size, real_images, fake_images):
    """ Calculates the gradient penalty.

    This loss is calculated on an interpolated image
    and added to the discriminator loss.
    """
    alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
    diff = fake_images - real_images
    interpolated = real_images + alpha * diff

    with tf.GradientTape() as gp_tape:
      gp_tape.watch(interpolated)
      pred = self.discriminator(interpolated, training=True)
      grads = gp_tape.gradient(pred, [interpolated])[0]
      norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
      gp = tf.reduce_mean((norm - 1.0) ** 2)
      return gp

  def train_step(self, real_images):
    if isinstance(real_images, tuple):
      real_images = real_images[0]

    batch_size = tf.shape(real_images)[0]

    for i in tqdm(range(self.d_steps)):
      random_latent_vectors = tf.random.normal(
        shape=(batch_size, self.latent_dim)
      )
      with tf.GradientTape() as tape:
        fake_images = self.generator(random_latent_vectors, training=True)
        fake_logits = self.discriminator(fake_images, training=True)
        real_logits = self.discriminator(real_images, training=True)
        d_cost = self.d_loss_fn(real_img=real_logits, fake_img=fake_logits)
        gp = self.gradient_penalty(batch_size, real_images, fake_images)
        d_loss = d_cost + gp * self.gp_weight
        d_gradient = tape.gradient(d_loss, self.discriminator.trainable_variables)
        self.d_optimizer.apply_gradients(
          zip(d_gradient, self.discriminator.trainable_variables)
        )

    random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
    with tf.GradientTape() as tape:
      generated_images = self.generator(random_latent_vectors, training=True)
      gen_img_logits = self.discriminator(generated_images, training=True)
      g_loss = self.g_loss_fn(gen_img_logits)
    gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
    self.g_optimizer.apply_gradients(
      zip(gen_gradient, self.generator.trainable_variables)
    )
    return {"d_loss": d_loss, "g_loss": g_loss}

In [None]:
class GANMonitor(tf.keras.callbacks.Callback):
  def __init__(self, num_img=6, latent_dim=128):
    self.num_img = num_img
    self.latent_dim = latent_dim

  def on_epoch_end(self, epoch, logs=None):
    random_latent_vectors = tf.random.normal(shape=(self.num_img, self.latent_dim))
    generated_images = self.model.generator(random_latent_vectors)
    generated_images = (generated_images * 127.5) + 127.5
    tf.saved_model.save(wgan, "saved_model/my_model/wgan_data_clean_cp")

    for i in range(self.num_img):
      img = generated_images[i].numpy()
      img = tf.keras.preprocessing.image.array_to_img(img)
      img.save("generated_epoch/generated_img_{epoch}.jpg".format( epoch=epoch))

## Train Model

In [None]:
from keras.callbacks import EarlyStopping,ModelCheckpoint
generator_optimizer = Adam(
  learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)
discriminator_optimizer = Adam(
  learning_rate=0.0002, beta_1=0.5, beta_2=0.9
)

def discriminator_loss(real_img, fake_img):
  real_loss = tf.reduce_mean(real_img)
  fake_loss = tf.reduce_mean(fake_img)
  return fake_loss - real_loss


def generator_loss(fake_img):
  return -tf.reduce_mean(fake_img)

epochs = 600

cbk = GANMonitor(num_img=1, latent_dim=latent_dim)

wgan = WGAN(
  discriminator=dis_model,
  generator=gen_model,
  latent_dim=latent_dim,
  discriminator_extra_steps=3,
)

wgan.compile(
  d_optimizer=discriminator_optimizer,
  g_optimizer=generator_optimizer,
  g_loss_fn=generator_loss,
  d_loss_fn=discriminator_loss,
)

wgan.fit(X_train, batch_size=batch_size, epochs=epochs, callbacks=[cbk])

## Generate Image

### load model and plot image

In [None]:
# load model
trained_model = tf.saved_model.load("saved_model/my_model/wgan")
mums_to_show = 20
g = trained_model.generator
random_latent_vectors = tf.random.normal(shape=(mums_to_show, latent_dim))


%pylab inline
import matplotlib.pyplot as plt


fig = plt.figure(figsize=(20,15))
gs = fig.add_gridspec(10, 10)

for line in range(0,10):
  for row in range(0,10):
    random_latent_vectors = tf.random.normal(shape=(mums_to_show, latent_dim))
    generated_images = g(random_latent_vectors, training=False)
    generated_images = (generated_images * 127.5) + 127.5
    num_image = generated_images[1].numpy()
    num_image = tf.keras.preprocessing.image.array_to_img(num_image)
    ax = fig.add_subplot(gs[line, row])
    ax.axis('off')
    ax.imshow(num_image)


### save to folder

In [None]:
num_img = 50
random_latent_vectors = tf.random.normal(shape=(num_img, latent_dim))
generated_images = g(random_latent_vectors)
generated_images = (generated_images * 127.5) + 127.5

for i in range(1, num_img+1):
  random_latent_vectors = tf.random.normal(shape=(num_img, latent_dim))
  generated_images = g(random_latent_vectors)
  generated_images = (generated_images * 127.5) + 127.5
  img = generated_images[i].numpy()
  img = tf.keras.preprocessing.image.array_to_img(img)
  img.save("/content/drive/MyDrive/university/introduction to AI/group17/{i}.jpg".format(i=i))