In [None]:
from tensorflow.keras.layers import Input, Dense, Reshape, Flatten, Dropout, Concatenate
from tensorflow.keras.layers import BatchNormalization, Activation, ZeroPadding2D, Add
from tensorflow.keras.layers import MaxPooling2D, UpSampling2D, Conv2D, Conv2DTranspose
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import SGD, Adam
import tensorflow as tf
from random import randint
 
from google.colab.patches import cv2_imshow
import numpy as np
from google.colab import drive
 
from tensorflow.python.framework.ops import disable_eager_execution, enable_eager_execution

import random

from tensorflow.keras.applications.vgg19 import VGG19
import glob

In [None]:
import logging
tf.get_logger().setLevel(logging.ERROR)

In [None]:
MODELS_PATH = "/content/gdrive/MyDrive/Projects/GDL Images/models/CycleGAN/"
D_A_PATH = MODELS_PATH+"d_A.h5"
D_B_PATH = MODELS_PATH+"d_B.h5"
G_AB_PATH = MODELS_PATH+"g_AB.h5"
G_BA_PATH = MODELS_PATH+"g_BA.h5"

In [None]:
drive.mount('/content/gdrive/')

In [None]:
!ln -s "/content/gdrive/MyDrive/Projects/GDL Images/dataset/monet.npy" "/content/monet.npy"
!ln -s "/content/gdrive/MyDrive/Projects/GDL Images/dataset/not/" "/content/"

In [None]:
def loadNpy(path):
  return np.load(path, mmap_mode='r')/255

In [None]:
class DataRemember():
  def __init__(self, maxsize=1000):
    self.maxsize = maxsize
    self.array = []
    self.length = 0
  
  def AddToArray(self, add):
    free_size = self.maxsize - len(self.array)
    to_remove = len(add) - free_size
    if len(self.array)>self.maxsize:
      print("fuck")
    if to_remove > 0:
      self.array = self.array[to_remove:]
    for i in add:
      self.array.append(i)
    random.shuffle(self.array)
    self.length = len(self.array)
  
  def ToNumpy(self):
    return np.array(self.array)

In [None]:
def buildGeneratorVGG19(input_shape, weight_init,name="generator"):
  vgg19 = VGG19(weights='imagenet',include_top=False)
  def downSample(input_layer,filters,amount,max_pool=True):
    y = input_layer
    for _ in range(amount):
      y = Conv2D(filters,kernel_size=3, strides=1, padding='same', activation="relu")(y)
    if max_pool:
      y = MaxPooling2D(pool_size=2)(y)
    return y
  def upSample(input_layer,filters,amount,weight_init):
    y = UpSampling2D(size=(2,2))(input_layer)
    for _ in range(amount):
      y = Conv2D(filters,kernel_size=3, strides=1, padding='same', activation="relu",kernel_initializer=weight_init)(y)
      y = BatchNormalization()(y)
      y = Dropout(0.08)(y)
    return y
  input_layer = Input(shape=input_shape)
  block1_out = downSample(input_layer,64,2)
  block2_out = downSample(block1_out,128,2)
  block3_out = downSample(block2_out,256,4)
  block4_out = downSample(block3_out,512,4)
  block5_out = downSample(block4_out,512,4,False)
  y = Concatenate()([block5_out,block4_out])
  y = upSample(y,512,4,weight_init)
  y = Concatenate()([block3_out,y])
  y = upSample(y,256,4,weight_init)
  y = Concatenate()([block2_out,y])
  y = upSample(y,128,2,weight_init)
  y = Concatenate()([block1_out,y])
  y = upSample(y,64,2,weight_init)
  output = Conv2D(3,kernel_size=3, strides=1, padding='same', activation="sigmoid",kernel_initializer=weight_init)(y)

  model = Model(input_layer,output,name=name)

  vgg19_layers = vgg19.layers
  model_layers = model.layers
  i = 0
  while i<len(vgg19_layers)-1:
    model_layers[i].set_weights(vgg19_layers[i].get_weights())
    model_layers[i].trainable = False
    i+=1
  return model

In [None]:
def buildDiscriminator(input_shape,weight_init,name="disc"):
  def conv4(layer_input,filters,weight_init,norm=True,stride=2):
    y = Conv2D(filters, kernel_size=4, strides=stride, padding='same', kernel_initializer = weight_init, activation="relu")(layer_input)
    if norm:
      y = BatchNormalization()(y)
    return y

  img = Input(shape=input_shape)

  y = conv4(img, 8, weight_init, norm = False)
  y = conv4(y, 16, weight_init)
  y = MaxPooling2D()(y)
  y = conv4(y, 32, weight_init)
  y = MaxPooling2D()(y)
  #y = conv4(y, 32, weight_init)
  output = Conv2D(1, kernel_size=2, strides=2, padding='same',kernel_initializer = weight_init, activation="sigmoid")(y)

  model = Model(img, output,name=name)
  model.compile(
    loss="mse",
    optimizer=Adam(),
    metrics=['accuracy']
  )
  return model

In [None]:
class CycleGAN():
  def __init__(self, 
      input_dim,
      lambda_validation,
      lambda_reconstr,
      lambda_id,
      d_A_path,
      d_B_path,
      g_AB_path,
      g_BA_path,
      load=False
    ):
    self.learning_rate = 0.02
    self.input_dim = input_dim
    self.lambda_validation = lambda_validation
    self.lambda_reconstr = lambda_reconstr
    self.lambda_id = lambda_id
    self.d_A_path = d_A_path
    self.d_B_path = d_B_path
    self.g_AB_path = g_AB_path
    self.g_BA_path = g_BA_path
    
    weight_init = RandomNormal(mean=0., stddev=0.02)
    #build models
    def LoadModelGenerator(name,path):
      if load:
        return tf.keras.models.load_model(path)
      return buildGeneratorVGG19(input_dim,weight_init,name)
    def LoadModelDiscriminator(name,path):
      if load:
        return tf.keras.models.load_model(path)
      return buildDiscriminator(input_dim,weight_init,name)
    self.d_A = LoadModelDiscriminator("d_A",d_A_path)
    self.d_B = LoadModelDiscriminator("d_B",d_B_path)
    self.g_AB = LoadModelGenerator("g_AB",g_AB_path)
    self.g_BA = LoadModelGenerator("g_BA",g_BA_path)
    self.disc_patch = self.d_A.output.shape[1:]

    self.queue_not_B = DataRemember()

    self.compile_models()
      
  def compile_models(self):
    # Input images from both domains
    img_A = Input(shape=self.input_dim)
    img_B = Input(shape=self.input_dim)

    # Translate images to the other domain
    fake_B = self.g_AB(img_A)
    fake_A = self.g_BA(img_B)

    # Translate images back to original domain
    reconstr_A = self.g_BA(fake_B)
    reconstr_B = self.g_AB(fake_A)

    # Identity mapping of images
    img_A_id = self.g_BA(img_A)
    img_B_id = self.g_AB(img_B)

    # Discriminators determines validity of translated images
    valid_A = self.d_A(fake_A)
    valid_B = self.d_B(fake_B)
    
    # Combined model trains generators to fool discriminators
    self.combined = Model(inputs=[img_A, img_B],
                          outputs=[valid_A, valid_B,
                                  reconstr_A, reconstr_B,
                                  img_A_id, img_B_id])
    self.combined.compile(loss=['mse', 'mse',
                                'mae', 'mae',
                                'mae', 'mae'],
                          loss_weights=[self.lambda_validation, self.lambda_validation,
                                        self.lambda_reconstr, self.lambda_reconstr,
                                        self.lambda_id, self.lambda_id],
                          optimizer=Adam(self.learning_rate, 0.5))

  def TrainDiscriminators(self):
    self.d_A.trainable = True
    self.d_B.trainable = True
    fake_A = self.g_BA.predict(self.original_B_data)
    fake_B = self.g_AB.predict(self.original_A_data)

    queue_not_A = DataRemember()
    queue_not_A.AddToArray(self.original_B_data)
    queue_not_A.AddToArray(fake_A)
    self.queue_not_B.AddToArray(self.original_A_data)
    self.queue_not_B.AddToArray(fake_B)

    x = np.concatenate((self.original_A_data,queue_not_A.ToNumpy()))
    zeros = np.zeros((queue_not_A.length,)+self.disc_patch)
    y = np.concatenate((self.d_A_disc_ones,zeros))
    while True:
      self.d_A.fit(x,y,shuffle=True,epochs=10,batch_size=32)
      self.d_A.save(self.d_A_path)
      inp = input("Train Again? y for yes ")
      if len(inp)==0:
        break

    x = np.concatenate((self.original_B_data,self.queue_not_B.ToNumpy()))
    zeros = np.zeros((self.queue_not_B.length,)+self.disc_patch)
    y = np.concatenate((self.d_B_disc_ones,zeros))
    while True:
      self.d_B.fit(x,y,shuffle=True,epochs=10,batch_size=32)
      self.d_B.save(self.d_B_path)
      inp = input("Train Again? y for yes ")
      if len(inp)==0:
        break

  def TrainGenerators(self):
    # For the combined model we will only train the generators
    self.d_A.trainable = False
    self.d_B.trainable = False
    self.combined.fit([self.original_A_data,self.original_B_data],
                      [self.d_A_disc_ones,self.d_B_disc_ones,
                       self.original_A_data,self.original_B_data,
                       self.original_A_data,self.original_B_data],
                      shuffle=True,epochs=2,batch_size=2)

  def SaveModels(self):
    self.d_A.trainable = True
    self.d_B.trainable = True
    self.d_A.save(self.d_A_path)
    self.d_B.save(self.d_B_path)
    self.g_AB.save(self.g_AB_path)
    self.g_BA.save(self.g_BA_path)

  def train(self, path_A, path_B, times=5):
    files = glob.glob(path_B)
    random.shuffle(files)
    self.original_A_data = loadNpy(path_A)
    A_len = self.original_A_data.shape[0]
    self.d_A_disc_ones = np.ones((self.original_A_data.shape[0],)+self.disc_patch)
    for B_file in files:
      B_data = loadNpy(B_file)
      i = B_data.shape[0]//A_len
      for j in range(i):
        self.original_B_data = B_data[j*A_len:(j+1)*A_len]
        self.d_B_disc_ones = np.ones((self.original_B_data.shape[0],)+self.disc_patch)
        self.TrainDiscriminators()
        self.TrainGenerators()
        self.ShowProgress()
        self.ShowProgress(False)
        self.SaveModels()

  def ShowProgress(self, model_AB=True):
    if model_AB:
      model = self.g_AB
      i = random.randint(0, self.original_A_data.shape[0])
      img = self.original_A_data[i]
      print("A->B:")
    else:
      model = self.g_BA
      i = random.randint(0, self.original_B_data.shape[0])
      img = self.original_B_data[i]
      print("B->A:")
    generated = model.predict(np.array([img]))[0]
    generated = generated*256
    print("original:")
    cv2_imshow(img*256)
    print("generated:")
    cv2_imshow(generated)

In [None]:
gan = CycleGAN(
  (256,256,3),
  1,2,2,
  D_A_PATH,
  D_B_PATH,
  G_AB_PATH,
  G_BA_PATH,
  True
)

In [None]:
gan.train("monet.npy","/content/not/*.npy")