# This notebook contains the demo for all aspects of Steganography GAN

Our goal is to crack the Least Significant Bit Stenganograpy algorithm using Cycle generative Adversarial Networks. Specifically, we want to train our models the understand how to translate between an encoded image and a decoded image. On top of this, we implement different techniques to show the success of our model and how to improve it. 

Before running the cells in this notebook, make sure to have gone through the README.md. Specifically, make sure to:

1. Have the dataset properly downloaded
2. (Optionally - Our repository contains all the images needed to test for bit-size 7) Pre-process the dataset to create all necessary images
3. (Optionally) Download the pre-trained weights

The cells below will walk through CycleGANs, Autoencoders, Bayesian Optimization, Saved Weights, and Bit Size Training. 

(We will run the cells that show the results using our pre-trained weights)

## Import all nessesary packages for all models

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals
from tensorflow_examples.models.pix2pix import pix2pix
from sten import Sten
from matplotlib.image import imread
from IPython.display import clear_output
from tqdm.auto import tqdm, trange
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
from tensorflow.keras.models import Sequential
from tensorflow.python.keras.utils.data_utils import Sequence
from tensorflow.keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D, Activation
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from bayes_opt import BayesianOptimization
from bayes_opt.logger import JSONLogger
from bayes_opt.event import Events

import os
import time
import glob
import random
import subprocess
import numpy as np
import tensorflow as tf
import matplotlib.image as mpimg
import matplotlib.pyplot as plt

AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
subprocess.call(['./get_data'])

In [None]:
subprocess.call(['./get_checkpoints_mini'])

In [None]:
!python preprocess_mini.py

## Enable GPU capabilities

In [None]:
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)

## Helper functions to show the progress of our models as they train

This section can be easily replaced with some other tool such as tqdm. 

In [None]:
def update_progress(max_epoch, epoch, progress, episode):
    bar_length = 50
    if isinstance(progress, int):
        progress = float(progress)
    if not isinstance(progress, float):
        progress = 0
    if progress < 0:
        progress = 0
    if progress >= 1:
        progress = 1

    block = int(round(bar_length * progress))

    clear_output(wait = True)
    text = "Episode {0}, Progress: [{1}] {2:.1f}%, Epoch {3}/{4}".format(episode, "=" * block + "." * (bar_length - block), progress * 100, epoch, max_epoch)
    print(text)

## CycleGAN: The following code shows how to run the CycleGAN code from scratch or with the use of saved weights and also how to use the Bit Size Training technique

The following section contains the main results of our research. In particular, this section will show how to implement Cycle Generative Adversarial Networks. 

These are the main hyper-parameters that can be altered for this model. The STEN_X specifies to the model which bit size to use.

In [None]:
EPOCHS = 100
STEPS_PER_EPOCH = 50
LAMBDA = 10
SAVE_RATE = 10
STEN_X = 7

The generate_images function prints and saves the cover image, hidden image, encoded image, decoded image, the image generated by Generator G (decoded image), and the image generated by generator F (encoded image)

In [None]:
def generate_images(generator_g, generator_f, cover_imgs, hidden_imgs, encoded_imgs, decoded_imgs, save_name):
    fig, axs = plt.subplots(len(cover_imgs), 6, figsize=(25,25))
    cols = ["Cover", "Hidden", "Encoded", "Decoded", "Generator G", "Generator F"]
    for x in range(len(cover_imgs)):
        predicted_g = generator_g.predict(np.asarray([encoded_imgs[x]]))
        predicted_f = generator_f.predict(np.asarray([predicted_g[0]]))
        display_list = [cover_imgs[x], hidden_imgs[x], encoded_imgs[x], decoded_imgs[x], predicted_g[0], predicted_f[0]]
        [axs[x, y].imshow(display_list[y] * 0.5 + 0.5) for y in range(6)]
    [ax.set_title(col,fontsize=40) for ax, col in zip(axs[0], cols)]
    [axi.set_axis_off() for axi in axs.ravel()]
    fig.tight_layout()
    plt.show()
    fig.savefig(save_name)

The discriminator_loss calculates the loss for the discriminator model in the CycleGAN

In [None]:
def discriminator_loss(a, generated):
    a_loss = loss_obj(tf.ones_like(a), a)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    total_disc_loss = a_loss + generated_loss
    return total_disc_loss * 0.5

The generator_loss calculates the loss for the generator model in the CycleGAN

In [None]:
def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

The calc_cycle_loss calculates the loss for a single cycle with an encoded image through the CycleGAN

In [None]:
def calc_cycle_loss(a_image, cycled_image):
    loss1 = tf.reduce_mean(tf.abs(a_image - cycled_image))
    return LAMBDA * loss1

The identity_loss calculates the identity loss for an image and the same image passed into the corresponding generator which should not alter the image

In [None]:
def identity_loss(a_image, same_image):
    loss = tf.reduce_mean(tf.abs(a_image - same_image))
    return LAMBDA * 0.5 * loss

Our model runs through the train_step for every encoded and decoded image passed into it. This is the function that changes the weights of our model as it progresses

In [None]:
@tf.function
def train_step(a_x, a_y):
    with tf.GradientTape(persistent=True) as tape:
            
        fake_y = generator_g(a_x, training=True)
        cycled_x = generator_f(fake_y, training=True)

        fake_x = generator_f(a_y, training=True)
        cycled_y = generator_g(fake_x, training=True)

        same_x = generator_f(a_x, training=True)
        same_y = generator_g(a_y, training=True)

        disc_a_x = discriminator_x(a_x, training=True)
        disc_a_y = discriminator_y(a_y, training=True)

        disc_fake_x = discriminator_x(fake_x, training=True)
        disc_fake_y = discriminator_y(fake_y, training=True)

        gen_g_loss = generator_loss(disc_fake_y)
        gen_f_loss = generator_loss(disc_fake_x)

        total_cycle_loss = calc_cycle_loss(a_x, cycled_x) + calc_cycle_loss(a_y, cycled_y)

        total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(a_y, same_y)
        total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(a_x, same_x)

        disc_x_loss = discriminator_loss(disc_a_x, disc_fake_x)
        disc_y_loss = discriminator_loss(disc_a_y, disc_fake_y)
  
    generator_g_gradients = tape.gradient(total_gen_g_loss, 
                                        generator_g.trainable_variables)
    generator_f_gradients = tape.gradient(total_gen_f_loss, 
                                        generator_f.trainable_variables)

    discriminator_x_gradients = tape.gradient(disc_x_loss, 
                                            discriminator_x.trainable_variables)
    discriminator_y_gradients = tape.gradient(disc_y_loss, 
                                            discriminator_y.trainable_variables)

    generator_g_optimizer.apply_gradients(zip(generator_g_gradients, 
                                            generator_g.trainable_variables))

    generator_f_optimizer.apply_gradients(zip(generator_f_gradients, 
                                            generator_f.trainable_variables))

    discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients,
                                                discriminator_x.trainable_variables))

    discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients,
                                                discriminator_y.trainable_variables))

Prepare the generator models, discriminator models, and loss fucntion for training

In [None]:
OUTPUT_CHANNELS = 3

generator_g = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')
generator_f = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')

discriminator_x = pix2pix.discriminator(norm_type='instancenorm', target=False)
discriminator_y = pix2pix.discriminator(norm_type='instancenorm', target=False)

In [None]:
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

Specify the checkpoints files location and load the weights if they exist for a specific bit size

In [None]:
checkpoint_path = "checkpoints/cycle_gan_{0}".format(STEN_X)

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=2)

In [None]:
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

Specify the checkpoints files location and load the weights if they exist for the Bit Size Training Technique

In [None]:
checkpoint_path = "checkpoints/cycle_gan_bit_size"

ckpt = tf.train.Checkpoint(generator_g=generator_g,
                           generator_f=generator_f,
                           discriminator_x=discriminator_x,
                           discriminator_y=discriminator_y,
                           generator_g_optimizer=generator_g_optimizer,
                           generator_f_optimizer=generator_f_optimizer,
                           discriminator_x_optimizer=discriminator_x_optimizer,
                           discriminator_y_optimizer=discriminator_y_optimizer)

ckpt_manager_bit = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=2)

In [None]:
if ckpt_manager_bit.latest_checkpoint:
    ckpt.restore(ckpt_manager_bit.latest_checkpoint)
    print ('Latest checkpoint restored!!')

Bit Size Training Technique

After the end of every epoch, the bit size used is randomly choosen. This allows for each epoch to be trained using a new bit size. At the end of training, the checkpoint is saved. 

In [None]:
for epoch in trange(EPOCHS, desc="Epochs"):
    for x in trange(STEPS_PER_EPOCH, desc="Steps per Epoch"):
        STEN_X = np.random.randint(0,9)
        try:
            set_1 = np.random.randint(1,11)
            set_2 = np.random.randint(1,6)
            name = str(set_1) + '_' + str(set_2)
            encoded = np.load(os.getcwd() + "/encodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            decoded = np.load(os.getcwd() + "/decodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            train_step(np.asarray([encoded/255.0], dtype='float32'), np.asarray([decoded/255.0], dtype='float32'))
        except:
            print("Error")
ckpt_manager_bit.save()

Training the CycleGAN model for a specific bit size

In [None]:
for epoch in trange(EPOCHS, desc="Epochs"):
    for x in trange(STEPS_PER_EPOCH, desc="Steps per Epoch"):
        try:
            set_1 = np.random.randint(1,11)
            set_2 = np.random.randint(1,6)
            name = str(set_1) + '_' + str(set_2)
            encoded = np.load(os.getcwd() + "/encodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            decoded = np.load(os.getcwd() + "/decodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            train_step(np.asarray([encoded/255.0], dtype='float32'), np.asarray([decoded/255.0], dtype='float32'))
        except:
            print("Error")
ckpt_manager.save()

Test the model and generate the image for the CycleGAN algorithm

In [None]:
cover_imgs = np.empty((0, 256, 256, 3))
hidden_imgs = np.empty((0, 256, 256, 3))
encoded_imgs = np.empty((0, 256, 256, 3))
decoded_imgs = np.empty((0, 256, 256, 3))
for x in range(5):
    set_1 = x+11
    set_2 = x+6
    name = str(set_1) + '_' + str(set_2)
    cover = mpimg.imread(os.getcwd() + "/data/set1/{0}.jpg".format(set_1)) / 255.0
    hidden = mpimg.imread(os.getcwd() + "/data/set2/{0}.jpg".format(set_2)) / 255.0
    encoded = np.load(os.getcwd() + "/encodedArray/bit_{0}/{1}.npy".format(STEN_X, name)) / 255.0
    decoded = np.load(os.getcwd() + "/decodedArray/bit_{0}/{1}.npy".format(STEN_X, name)) / 255.0
    cover_imgs = np.row_stack((cover_imgs, np.asarray([cover])))
    hidden_imgs = np.row_stack((hidden_imgs, np.asarray([hidden])))
    encoded_imgs = np.row_stack((encoded_imgs, np.asarray([encoded])))
    decoded_imgs = np.row_stack((decoded_imgs, np.asarray([decoded])))

Images for Bit Size Training Technique

In [None]:
generate_images(generator_g, generator_f, cover_imgs, hidden_imgs, encoded_imgs, decoded_imgs, "bit_size_training.png")

Images for specific bit size training

In [None]:
generate_images(generator_g, generator_f, cover_imgs, hidden_imgs, encoded_imgs, decoded_imgs, "cycle_gan_{0}.png".format(STEN_X))

## Autoencoder: The following code shows how to run the Autoencoder code from scratch or with the use of saved weights

The following section contains the results of our Autoencoder to compare to our CycleGAN.

These are the main hyper-parameters that can be altered for this model. The STEN_X specifies to the model which bit size to use.

In [None]:
BATCH_SIZE = 1
EPOCHS = 50
IMG_ROWS, IMG_COLS = 256, 256
SAVE_RATE = 100
STEN_X = 7

This is a Generator for Keras to load the data but not hold all of it in memory

In [None]:
class Generator(Sequence) :
    def __init__(self, image_filenames, batch_size) :
        self.image_filenames = image_filenames
        self.batch_size = batch_size
    def __len__(self) :
        return (np.ceil(len(self.image_filenames) / float(self.batch_size))).astype(np.int)
    def __getitem__(self, idx) :
        batch_x = self.image_filenames[idx * self.batch_size : (idx+1) * self.batch_size]
        X = []
        Y = []
        for file in batch_x:
            name = file[21:-4]
            X.append(np.load(file) / 255.0)
            Y.append(np.load("./decodedArray/bit_{0}/{1}.npy".format(STEN_X, name)) / 255.0)
        X = np.asarray(X)
        Y = np.asarray(Y)
        return X, Y

The generate_images function prints and saves the cover image, hidden image, encoded image, decoded image, and the predicted image

In [None]:
def generate_images(model, cover_imgs, hidden_imgs, encoded_imgs, decoded_imgs, save_name):
    fig, axs = plt.subplots(len(cover_imgs), 5, figsize=(15,15))
    cols = ["Cover", "Hidden", "Encoded", "Decoded", "Predicted"]
    for x in range(len(cover_imgs)):
        predicted = model.predict(np.asarray([encoded]))
        display_list = [cover_imgs[x], hidden_imgs[x], encoded_imgs[x], decoded_imgs[x], predicted[0]]
        [axs[x, y].imshow(display_list[y] * 0.5 + 0.5) for y in range(5)]
    [ax.set_title(col,fontsize=40) for ax, col in zip(axs[0], cols)]
    [axi.set_axis_off() for axi in axs.ravel()]
    fig.tight_layout()
    plt.show()
    fig.savefig(save_name)

Create the model

In [None]:
shape = (IMG_ROWS, IMG_COLS, 3)

In [None]:
autoencoder = Sequential()

autoencoder.add(Conv2D(64, (8, 8), padding='same', input_shape=shape))
autoencoder.add(Activation('relu'))
autoencoder.add(MaxPooling2D(pool_size=(2,2), padding='same'))

autoencoder.add(Conv2D(32,(5, 5), padding='same'))
autoencoder.add(Activation('relu'))
autoencoder.add(MaxPooling2D(pool_size=(2,2), padding='same'))

autoencoder.add(Conv2D(32,(5, 5), padding='same'))
autoencoder.add(Activation('relu'))
autoencoder.add(UpSampling2D((2, 2)))

autoencoder.add(Conv2D(64,(8, 8), padding='same'))
autoencoder.add(Activation('relu'))
autoencoder.add(UpSampling2D((2, 2)))

autoencoder.add(Conv2D(3,(3, 3), padding='same'))
autoencoder.add(Activation('sigmoid'))

autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy', metrics=['accuracy'])

Load the checkpoints

In [None]:
checkpoint_path = "checkpoints/autoencoder_{0}".format(STEN_X)
ckpt = tf.train.Checkpoint(autoencoder=autoencoder)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
if ckpt_manager.latest_checkpoint:
    ckpt.restore(ckpt_manager.latest_checkpoint)
    print ('Latest checkpoint restored!!')

Train the Autoencoder model with a Generator

In [None]:
files = glob.glob("./encodedArray/bit_{0}/*".format(STEN_X))
train = files
train_gen = Generator(train, BATCH_SIZE)
autoencoder.fit_generator(generator=train_gen,
                    epochs = EPOCHS,
                    verbose = 1,
                    shuffle = True)

In [None]:
ckpt_manager.save()

Test the model and generate the image for the CycleGAN algorithm

In [None]:
cover_imgs = np.empty((0, 256, 256, 3))
hidden_imgs = np.empty((0, 256, 256, 3))
encoded_imgs = np.empty((0, 256, 256, 3))
decoded_imgs = np.empty((0, 256, 256, 3))
for x in range(5):
    set_1 = x+11
    set_2 = x+6
    name = str(set_1) + '_' + str(set_2)
    cover = mpimg.imread(os.getcwd() + "/data/set1/{0}.jpg".format(set_1)) / 255.0
    hidden = mpimg.imread(os.getcwd() + "/data/set2/{0}.jpg".format(set_2)) / 255.0
    encoded = np.load(os.getcwd() + "/encodedArray/bit_{0}/{1}.npy".format(STEN_X, name)) / 255.0
    decoded = np.load(os.getcwd() + "/decodedArray/bit_{0}/{1}.npy".format(STEN_X, name)) / 255.0
    cover_imgs = np.row_stack((cover_imgs, np.asarray([cover])))
    hidden_imgs = np.row_stack((hidden_imgs, np.asarray([hidden])))
    encoded_imgs = np.row_stack((encoded_imgs, np.asarray([encoded])))
    decoded_imgs = np.row_stack((decoded_imgs, np.asarray([decoded])))

Images for Autoencoder for specific bit size

In [None]:
generate_images(autoencoder, cover_imgs, hidden_imgs, encoded_imgs, decoded_imgs, "autoencoder_{}.png".format(STEN_X))

## Bayesian Optimization with CycleGAN: The following code shows how to run Bayesian Optimzation and save the results

These are the main hyper-parameters that can be altered for this model. The STEN_X specifies to the model which bit size to use.

In [None]:
STEN_X = 7
MAX0 = 11
MAX1 = 6
EPOCHS_RANGE = (200, 500)
LAMBDA_RANGE = (8, 12)
STEPS_PER_EPOCH_RANGE = (10, 50)

The discriminator_loss calculates the loss for the discriminator model in the CycleGAN

In [None]:
def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5

The generator_loss calculates the loss for the generator model in the CycleGAN

In [None]:
def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

The calc_cycle_loss calculates the loss for a single cycle with an encoded image through the CycleGAN

In [None]:
def calc_cycle_loss(LAMBDA, real_image, cycled_image):
    loss1 = tf.reduce_mean(tf.abs(real_image - cycled_image))
    return int(LAMBDA) * loss1

The identity_loss calculates the identity loss for an image and the same image passed into the corresponding generator which should not alter the image

In [None]:
def identity_loss(LAMBDA, real_image, same_image):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return int(LAMBDA) * 0.5 * loss

Our model runs through the train_step for every encoded and decoded image passed into it. This is the function that changes the weights of our model as it progresses. This step has been modified to run with Bayesian Optmization.

In [None]:
@tf.function
def train_step(LAMBDA,real_x, real_y): 
    with tf.GradientTape(persistent=True) as tape:
        fake_y = generator_g(real_x, training=True)

        cycled_x = generator_f(fake_y, training=True) 

        fake_x = generator_f(real_y, training=True) 
        cycled_y = generator_g(fake_x, training=True) 

        same_x = generator_f(real_x, training=True) 
        same_y = generator_g(real_y, training=True) 

        disc_real_x = discriminator_x(real_x, training=True) 
        disc_real_y = discriminator_y(real_y, training=True)

        disc_fake_x = discriminator_x(fake_x, training=True) 
        disc_fake_y = discriminator_y(fake_y, training=True)

        gen_g_loss = generator_loss(disc_fake_y)  
        gen_f_loss = generator_loss(disc_fake_x) 

        total_cycle_loss = calc_cycle_loss(LAMBDA,real_x, cycled_x) + calc_cycle_loss(LAMBDA,real_y, cycled_y)

        total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(LAMBDA, real_y, same_y)
        total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(LAMBDA, real_x, same_x)

        disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x) 
        disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y) 

    generator_g_gradients = tape.gradient(total_gen_g_loss,generator_g.trainable_variables)
    generator_f_gradients = tape.gradient(total_gen_f_loss, generator_f.trainable_variables)
    discriminator_x_gradients = tape.gradient(disc_x_loss,discriminator_x.trainable_variables)
    discriminator_y_gradients = tape.gradient(disc_y_loss, discriminator_y.trainable_variables)

    generator_g_optimizer.apply_gradients(zip(generator_g_gradients,generator_g.trainable_variables))
    generator_f_optimizer.apply_gradients(zip(generator_f_gradients, generator_f.trainable_variables))
    discriminator_x_optimizer.apply_gradients(zip(discriminator_x_gradients,discriminator_x.trainable_variables))
    discriminator_y_optimizer.apply_gradients(zip(discriminator_y_gradients,discriminator_y.trainable_variables))

Loss function for Bayesian Optimization

In [None]:
def mse(imageA, imageB):
    err = np.sum((imageA.astype("float") - imageB.astype("float")) ** 2)
    err /= float(imageA.shape[0] * imageA.shape[1])
    return -err

Prepare the generator models, discriminator models, and loss fucntion for training

In [None]:
OUTPUT_CHANNELS = 3

generator_g = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')
generator_f = pix2pix.unet_generator(OUTPUT_CHANNELS, norm_type='instancenorm')

discriminator_x = pix2pix.discriminator(norm_type='instancenorm', target=False)
discriminator_y = pix2pix.discriminator(norm_type='instancenorm', target=False)

In [None]:
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [None]:
generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

This is the specified Black-box function that Bayesian Optimization will maximize

In [None]:
def Black_Box(EPOCHS,LAMBDA,steps_per_epochs):
    for epoch in trange(int(EPOCHS),desc='epochs'):
        for _ in trange(int(steps_per_epochs), desc='steps_per_epochs'):
            i = np.random.randint(1,MAX0)
            j = np.random.randint(1,MAX1)
            name = str(i) + '_' + str(j)
            image_x = np.load(os.getcwd() + "/encodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            image_y = np.load(os.getcwd() + "/decodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            train_step(LAMBDA,np.asarray([image_x/255.0], dtype='float32'), np.asarray([image_y/255.0], dtype='float32'))
    sum = 0.0
    for i in trange(1,MAX0):
        for j in trange(1,MAX1):
            name = str(i) + '_' + str(j)
            image_x = np.load(os.getcwd() + "/encodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            image_y = np.load(os.getcwd() + "/decodedArray/bit_{0}/{1}.npy".format(STEN_X, name))
            sum += mse(generator_g.predict(np.asarray([image_x/255.0], dtype='float32')), np.asarray([image_y/255.0], dtype='float32'))
    avg = sum / ((MAX0-1)*(MAX1-1))
    return avg

Bounds for the Bayesian Optimization to explore

In [None]:
bounds = {
    'EPOCHS': EPOCHS_RANGE,
    'LAMBDA': LAMBDA_RANGE,
    'steps_per_epochs': STEPS_PER_EPOCH_RANGE 
}

In [None]:
optimizer = BayesianOptimization(
    f = Black_Box,
    pbounds = bounds,
    random_state = 1
)

Save the results to a log file

In [None]:
logger = JSONLogger(path="./logs_{0}.json".format(STEN_X))
optimizer.subscribe(Events.OPTIMIZATION_STEP, logger)

Run Bayesian Optimization

In [None]:
optimizer.maximize(init_points=2,n_iter=10)