In [1]:
import cv2
import sys
import glob
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras import Input
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.initializers import RandomNormal
from tensorflow.keras.layers import Concatenate
from tensorflow_addons.layers import InstanceNormalization
from tensorflow.keras.utils import plot_model
from IPython.display import clear_output


In [2]:
file_foggy = '/Users/boyang/Documents/projects/5469/5469project/data/training/foggy/*'
file_city = '/Users/boyang/Documents/projects/5469/5469project/data/training/city/*'

images_r = np.zeros(shape=(10,512,512,3))
images_c = np.zeros(shape=(10,512,512,3))
i=0
image_paths = glob.glob(file_foggy)
for file in image_paths:
    img = cv2.cvtColor(cv2.imread(file), cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (512,512), interpolation = cv2.INTER_AREA)
    img = np.array((img/127.5) - 1)
    images_r[i] = img
    i+=1
    if i%10==0:
        print("Foggy", i)
        break

i=0        
image_paths = sorted(glob.glob(file_city))
for file in image_paths:
    img = cv2.cvtColor(cv2.imread(file), cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (512,512), interpolation = cv2.INTER_AREA)
    img = np.array((img/127.5) - 1)
    images_c[i] = img
    i+=1
    if i%10==0:
        print("City", i)
        break

Foggy 10
City 10


In [3]:
train_X = images_r[:int(0.9*len(images_r))]
test_X = images_r[int(0.9*len(images_r)):]
train_y = images_c[:int(0.9*len(images_c))]
test_y = images_c[int(0.9*len(images_c)):]

In [4]:
# define the discriminator model
def define_discriminator(image_shape):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # source image input
    in_image = Input(shape=image_shape)
    # C64
    d = Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(in_image)
    d = LeakyReLU(alpha=0.2)(d)
    # C128
    d = Conv2D(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # C256
    d = Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # C512
    d = Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # second last output layer
    d = Conv2D(512, (4,4), padding='same', kernel_initializer=init)(d)
    d = InstanceNormalization(axis=-1)(d)
    d = LeakyReLU(alpha=0.2)(d)
    # patch output
    patch_out = Conv2D(1, (4,4), padding='same', kernel_initializer=init)(d)
    # define model
    model = Model(in_image, patch_out)
    # compile model
    model.compile(loss='mse', optimizer=Adam(lr=0.0002, beta_1=0.5), loss_weights=[0.5])
    return model
# define image shape
image_shape = (256,256,3)
# create the model
model = define_discriminator(image_shape)
# summarize the model
model.summary()
# plot the model
plot_model(model, to_file='discriminator_model_plot.png', show_shapes=True, show_layer_names=True)

Metal device set to: Apple M1


2022-12-01 11:37:56.049365: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2022-12-01 11:37:56.049549: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 256, 256, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 128, 128, 64)      3136      
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 128, 128, 64)      0         
                                                                 
 conv2d_1 (Conv2D)           (None, 64, 64, 128)       131200    
                                                                 
 instance_normalization (Ins  (None, 64, 64, 128)      256       
 tanceNormalization)                                             
                                                                 
 leaky_re_lu_1 (LeakyReLU)   (None, 64, 64, 128)       0         
                                                             

  super().__init__(name, **kwargs)


In [5]:
# generator a resnet block
def resnet_block(n_filters, input_layer):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # first layer convolutional layer
    g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(input_layer)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # second convolutional layer
    g = Conv2D(n_filters, (3,3), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    # concatenate merge channel-wise with input layer
    g = Concatenate()([g, input_layer])
    return g


In [6]:
# define the standalone generator model
def define_generator(image_shape, n_resnet=6):
  #  with strategy.scope():
  # Everything that creates variables should be under the strategy scope.
  # In general this is only model construction & `compile()`.
  
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # image input
    in_image = Input(shape=image_shape)
    # c7s1-64
    g = Conv2D(64, (7,7), padding='same', kernel_initializer=init)(in_image)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # d128
    g = Conv2D(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # d256
    g = Conv2D(256, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # R256
    for _ in range(n_resnet):
        g = resnet_block(256, g)
    # u128
    g = Conv2DTranspose(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # u64
    g = Conv2DTranspose(64, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    g = Activation('relu')(g)
    # c7s1-3
    g = Conv2D(3, (7,7), padding='same', kernel_initializer=init)(g)
    g = InstanceNormalization(axis=-1)(g)
    out_image = Activation('tanh')(g)
    # define model
    model = Model(in_image, out_image)
    return model
# define image shape
image_shape = (256,256,3)
# create the model
model = define_generator(image_shape)
# summarize the model
model.summary()
# plot the model
plot_model(model, to_file='generator_model_plot.png', show_shapes=True, show_layer_names=True)

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv2d_6 (Conv2D)              (None, 256, 256, 64  9472        ['input_2[0][0]']                
                                )                                                                 
                                                                                                  
 instance_normalization_4 (Inst  (None, 256, 256, 64  128        ['conv2d_6[0][0]']               
 anceNormalization)             )                                                           

                                                                                                  
 instance_normalization_13 (Ins  (None, 64, 64, 256)  512        ['conv2d_15[0][0]']              
 tanceNormalization)                                                                              
                                                                                                  
 activation_6 (Activation)      (None, 64, 64, 256)  0           ['instance_normalization_13[0][0]
                                                                 ']                               
                                                                                                  
 conv2d_16 (Conv2D)             (None, 64, 64, 256)  590080      ['activation_6[0][0]']           
                                                                                                  
 instance_normalization_14 (Ins  (None, 64, 64, 256)  512        ['conv2d_16[0][0]']              
 tanceNorm

In [7]:
# define a composite model for updating generators by adversarial and cycle loss
def define_composite_model(g_model_1, d_model, g_model_2, image_shape):
    # ensure the model we're updating is trainable
    g_model_1.trainable = True
    # mark discriminator as not trainable
    d_model.trainable = False
    # mark other generator model as not trainable
    g_model_2.trainable = False
    # discriminator element
    input_gen = Input(shape=image_shape)
    gen1_out = g_model_1(input_gen)
    output_d = d_model(gen1_out)
    # identity element
    input_id = Input(shape=image_shape)
    output_id = g_model_1(input_id)
    # forward cycle
    output_f = g_model_2(gen1_out)
    # backward cycle
    gen2_out = g_model_2(input_id)
    output_b = g_model_1(gen2_out)
    # define model graph
    model = Model([input_gen, input_id], [output_d, output_id, output_f, output_b])
    # define optimization algorithm configuration
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    # compile model with weighting of least squares loss and L1 loss
    model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=opt)
    return model

In [8]:
# input shape
image_shape = (512,512,3)
# generator: A -> B
g_model_AtoB = define_generator(image_shape)
# generator: B -> A
g_model_BtoA = define_generator(image_shape)
# discriminator: A -> [real/fake]
d_model_A = define_discriminator(image_shape)
# discriminator: B -> [real/fake]
d_model_B = define_discriminator(image_shape)
# composite: A -> B -> [real/fake, A]
c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
# composite: B -> A -> [real/fake, B]
c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)

In [9]:
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape):
    # choose random instances
    ix = np.random.randint(0, dataset.shape[0], n_samples)
    # retrieve selected images
    X = dataset[ix]
    # generate 'real' class labels (1)
    y = np.ones((n_samples, patch_shape, patch_shape, 1))
    return X, y


# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset, patch_shape):
    # generate fake instance
    X = g_model.predict(dataset)
    
    plt.figure(figsize=(6, 6))
    display_list = [dataset[0], X[0]]
    title = ['Input Image', 'Predicted Image']

    for i in range(2):
        plt.subplot(1, 2, i+1)
        plt.title(title[i])
        # getting the pixel values between [0, 1] to plot it.
        plt.imshow((display_list[i] + 1) * 0.5)
        plt.axis('off')
    plt.show()
            
    # create 'fake' class labels (0)
    y = np.zeros((len(X), patch_shape, patch_shape, 1))
    return X, y


# select a batch of real samples
# X_realA, y_realA = generate_real_samples(train_X, 1, 24)
# X_realB, y_realB = generate_real_samples(train_y, 1, 24)

# # generate a batch of fake samples
# X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, 24)
# X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, 24)

In [10]:
# update image pool for fake images
def update_image_pool(pool, images, max_size=50):
    selected = list()
    for image in images:
        if len(pool) < max_size:
            # stock the pool
            pool.append(image)
            selected.append(image)
        elif np.random.random() < 0.5:
            # use image, but don't add it to the pool
            selected.append(image)
        else:
            # replace an existing image and use replaced image
            ix = np.random.randint(0, len(pool))
            selected.append(pool[ix])
            pool[ix] = image
    return np.asarray(selected)

# update fakes from pool
#X_fakeA = update_image_pool(poolA, X_fakeA)
#X_fakeB = update_image_pool(poolB, X_fakeB)

In [11]:
# train function for the cyclegan models
def train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, trainA,trainB):

  # define properties of the training run
  n_epochs, n_batch, = 10, 1 #Keep n_batch as 1 for per image training. n_epochs can be changed
  # determine the output square shape of the discriminator
  n_patch = d_model_A.output_shape[1] #Notice n_patch depends on the discriminator output image size. This parameter can be changed for improvement.
  
  # prepare image pool for fakes
  poolA, poolB = list(), list()
  # calculate the number of batches per training epoch
  bat_per_epo = int(len(trainA) / n_batch)
  # calculate the number of training iterations
  n_steps = bat_per_epo * n_epochs
  # manually enumerate epochs
  print(n_steps)
  for i in range(n_steps):
  # select a batch of real samples
    X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch)
    X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch)
    
    # generate a batch of fake samples
    X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch)
    X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch)
    # update fakes from pool
    X_fakeA = update_image_pool(poolA, X_fakeA)
    X_fakeB = update_image_pool(poolB, X_fakeB)
    # update generator B->A via adversarial and cycle loss
    g_loss2, _, _, _, _ = c_model_BtoA.train_on_batch([X_realB, X_realA], [y_realA, X_realA, X_realB, X_realA])
    # update discriminator for A -> [real/fake]
    dA_loss1 = d_model_A.train_on_batch(X_realA, y_realA)
    dA_loss2 = d_model_A.train_on_batch(X_fakeA, y_fakeA)
    # update generator A->B via adversarial and cycle loss
    g_loss1, _, _, _, _ = c_model_AtoB.train_on_batch([X_realA, X_realB], [y_realB, X_realB, X_realA, X_realB])
    # update discriminator for B -> [real/fake]
    dB_loss1 = d_model_B.train_on_batch(X_realB, y_realB)
    dB_loss2 = d_model_B.train_on_batch(X_fakeB, y_fakeB)
    # summarize performance
    print('>%d, dA[%.3f,%.3f] dB[%.3f,%.3f] g[%.3f,%.3f]' % (i+1, dA_loss1,dA_loss2, dB_loss1,dB_loss2, g_loss1,g_loss2))
  return(g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA)

In [1]:
# load a dataset as a list of two numpy arrays
trainA = train_X
trainB = train_y
# train models
train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, trainA, trainB)

NameError: name 'train_X' is not defined

In [None]:
from tensorflow.keras.models import load_model

g_model_AtoB.save('g_model_AtoB.hdf5')
g_model_BtoA.save('g_model_BtoA.hdf5')
c_model_AtoB.save('c_model_AtoB.hdf5')
c_model_BtoA.save('c_model_BtoA.hdf5')

g_model_AtoB = load_model('g_model_AtoB.hdf5')
g_model_BtoA = load_model('g_model_BtoA.hdf5')
c_model_AtoB = load_model('c_model_AtoB.hdf5')
c_model_BtoA = load_model('c_model_BtoA.hdf5')

In [32]:
import matplotlib.pyplot as plt
from random import randint

#write a function to generate 'n_samples' random integers in range (start,stop)
def randnums(start,stop,n_samples):
  ix=[]
  for i in range(n_samples):
    ix.append(randint(start,stop))
  ix=np.array(ix)
  return ix

def test(g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, trainA,trainB):
###################START CODE HERE#############################
  X_realA=trainA[randnums(0, trainA.shape[0]-1, 1)]
  X_fakeB=g_model_AtoB.predict(X_realA)
  X_realB=trainB[randnums(0, trainB.shape[0]-1, 1)]
  X_fakeA=g_model_BtoA.predict(X_realB)
###############################################################    
  ########First A to B transformation
  print('Original A Image is=\n')
  plt.imshow(np.squeeze(X_realA))
  plt.show()
  print('Fake B image is=\n')
  plt.imshow(np.squeeze(X_fakeB))
  plt.show()
  #############Next B to A transformation
  print('Original B Image is=\n')
  plt.imshow(np.squeeze(X_realB))
  plt.show()
  print('Fake A image is=\n')
  plt.imshow(np.squeeze(X_fakeA))
  plt.show()

In [1]:
##Calling the test function here
test(g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, trainA,trainB)

NameError: name 'test' is not defined

In [35]:
from skimage.metrics import mean_squared_error as mse, structural_similarity as ssim
#Function to compute similarity in Cycle images
def similarity_real_fake(real,fake):
  m = mse(real, fake)
  s = ssim(np.squeeze(real), np.squeeze(fake), multichannel=True)
  print('Original Image is=\n')
  plt.suptitle("MSE: %.2f, SSIM: %.2f" % (m, s))
  plt.imshow(np.squeeze(real))
  plt.show()
  print('Fake image is=\n')
  plt.imshow(np.squeeze(fake))
  plt.show()

def test_Cycle(g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, trainA,trainB):

  # writting this function to call the similarity_real_fake function above and print it under each real->Generated image pair
  print('A->B->A full cycle\n')
  for i in range(3):
    X_realA=trainA[randnums(0, trainA.shape[0]-1, 1)]
    X_fakeB=g_model_AtoB.predict(X_realA)
    X_fakeA=g_model_BtoA.predict(X_fakeB)
    similarity_real_fake(X_realA,X_fakeA)

  print('B->A->B full cycle\n')
  for i in range(3):
    X_realB=trainB[randnums(0, trainA.shape[0]-1, 1)]
    X_fakeA=g_model_AtoB.predict(X_realB)
    X_fakeB=g_model_BtoA.predict(X_fakeA)
    similarity_real_fake(X_realB,X_fakeB)



In [2]:
# Calling the function test_Cycle(....)
test_Cycle(g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, trainA,trainB)

NameError: name 'test_Cycle' is not defined