In [None]:
# HiDDeN / complete-30-gan-closs.py
# Gourav Siddhad
# 21-Apr-2019

In [None]:
from __future__ import absolute_import, division, print_function

from numpy.random import seed
seed(1607)
from tensorflow import set_random_seed
set_random_seed(1607)

print('Importing Libraries', end='')

import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
from numpy.random import seed
import random
import re
import os
import pandas as pd
import time
import csv
import math

from PIL import Image, ImageFile

from skimage import io
from skimage.io import imread, imshow
from sklearn.utils import shuffle
from sklearn.preprocessing import MultiLabelBinarizer

import tensorflow as tf
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import keras
import keras.backend as K
from keras.models import Model, Sequential, load_model
from keras.layers import Activation, Input, Dense, Conv2D, BatchNormalization, ReLU
from keras.layers import GlobalAveragePooling2D, Flatten, Reshape, Lambda, Add
from keras import optimizers
from keras.preprocessing import sequence
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

from keras.utils import to_categorical, plot_model
from keras.callbacks import EarlyStopping, ModelCheckpoint, CSVLogger
from keras.layers.merge import concatenate

from keras.utils.vis_utils import plot_model
import matplotlib.pyplot as plt
%matplotlib inline

print(' - Done')

In [None]:
# Model Configuration
H, W, C, L = 128, 128, 3, 30
all_messages_train = []
all_messages_validate = []
all_messages_test = []

# Training train_options
batch_size = 12
n_epochs = 200

modelpath = 'models/'

train_folder = 'dataset128/original/train/'
validate_folder = 'dataset128/original/validate/'
test_folder = 'dataset128/original/test/'
    
print('Model    - ', modelpath)
print('Train    - ', train_folder)
print('Validate - ', validate_folder)
print('Test     - ', test_folder)

In [None]:
print('Reading Image List', end='')
train_list = os.listdir(train_folder)
validate_list = os.listdir(validate_folder)
test_list = os.listdir(test_folder)
print(' - Done')

print('Train    - ', len(train_list))
print('Validate - ', len(validate_list))
print('Test - ', len(test_list))
print()

message_train = []
message_validate = []
message_test = []

print('Reading Messages', end='')
with open(modelpath + str(L) + '-train-msg.csv') as csv_file:
    csv_reader = csv.reader(csv_file)
    line_count = 0
    for row in csv_reader:
        if line_count%2 is 0:
            message_train.append(row)
        line_count += 1
csv_file.close()

with open(modelpath + str(L) + '-validate-msg.csv') as csv_file:
    csv_reader = csv.reader(csv_file)
    line_count = 0
    for row in csv_reader:
        if line_count%2 is 0:
            message_validate.append(row)
        line_count += 1
csv_file.close()

with open(modelpath + str(L) + '-test-msg.csv') as csv_file:
    csv_reader = csv.reader(csv_file)
    line_count = 0
    for row in csv_reader:
        if line_count%2 is 0:
            message_test.append(row)
        line_count += 1
csv_file.close()
print(' - Done')

print('Train Messages    - ', len(message_train))
print('Validate Messages - ', len(message_validate))
print('Test Messages     - ', len(message_test))

In [None]:
def gen_msg_plates(message):
    msg = []
    for l in message:
        if l is '1':
            temp = np.ones((H, W), dtype='float32')
            msg.append(temp)
        elif l is '0':
            temp = np.zeros((H, W), dtype='float32')
            msg.append(temp)
    return np.array(msg)

def gen_msg(message):
    msg = []
    for l in message:
        if l is '1':
            msg.append(float(1))
        if l is '0':
            msg.append(float(0))
    return np.array(msg, dtype='float32')

In [None]:
lamI = 0.7
lamG = 0.001

def encoder_loss(y_true, y_pred):
    # Euclidean Distance
    loss = K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1))
    return loss/(C * H * W)

def decoder_loss(y_true, y_pred):
    # Euclidean Distance
    loss = K.sqrt(K.sum(K.square(y_pred - y_true), axis=-1))
    return loss/L

def adversary_loss_alone(y_true, y_pred):
    # Log Loss
    loss = K.log(1.0 - y_true) + K.log(y_pred)
    return loss

def adversary_loss(y_true, y_pred):
    # Log Loss
    loss = K.log(1.0 - y_pred)
    return loss

adam = optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999)

In [None]:
print('Encoder Network')
# Encoder
def build_encoder():
    input_e = Input(shape=(H, W, C))
    input_m = Input(shape=(L, H, W))
    layer_e1 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(input_e)
    layer_e2 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_e1)
    layer_e3 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_e2)
    layer_e4 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_e3)

    # Reshape Message
    layer_r = Reshape((H, W, L))(input_m)

    # Integrate Message to Model
    merge = concatenate([layer_e4, input_e, layer_r])
    layer_e5 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(merge)
    output_e = Conv2D(filters=C, kernel_size=(1, 1), strides=(1, 1), padding='same', name='Enc')(layer_e5)
    
    model_e = Model(inputs=[input_e, input_m], outputs=output_e)
    model_e.compile(loss=encoder_loss, optimizer=adam, metrics=['accuracy'])
    return model_e

In [None]:
print('Decoder Network')
# Decoder
def build_decoder():
    input_d = Input(shape=(H, W, C))
    layer_d1 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(input_d)
    layer_d2 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d1)
    layer_d3 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d2)
    layer_d4 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d3)
    layer_d5 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d4)
    layer_d6 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d5)
    layer_d7 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d6)
    layer_d8 = Sequential([Conv2D(filters=L, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_d7)
    layer_d9 = GlobalAveragePooling2D(data_format=None)(layer_d8)
    output_d = Activation('linear')(layer_d9)
    
    model_d = Model(inputs=input_d, outputs=output_d)
    model_d.compile(loss=decoder_loss, optimizer=adam, metrics=['binary_accuracy'])
    # output_d = Dense(L, use_bias=False, activation='sigmoid', name='Dec')(layer_d9)
    return model_d

In [None]:
print('Adversary Network')
# Adversary
def build_adversary():
    input_a = Input(shape=(H, W, C))
    layer_a1 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(input_a)
    layer_a2 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_a1)
    layer_a3 = Sequential([Conv2D(filters=64, kernel_size=(3, 3), strides=(1, 1), padding='same'),
                    BatchNormalization(axis=-1, epsilon=1e-05, momentum=0.1, center=True, scale=True),
                    ReLU(max_value=None, negative_slope=0.0, threshold=0.0)])(layer_a2)
    layer_a4 = GlobalAveragePooling2D(data_format=None)(layer_a3)
    output_a = Dense(1, activation='linear', name='Adv')(layer_a4)
#     output_a = Dense(2, use_bias=False, activation='softmax', name='Adv')(layer_a4)

    model_a = Model(inputs=input_a, outputs=output_a)
    model_a.compile(loss=adversary_loss_alone, optimizer=adam, metrics=['accuracy'])
    
    return model_a

In [None]:
print('Combined Network')
# Combined
def build_combined(encoder, decoder, adversary):
    adversary.trainable = False
    
    c_iinput = Input(shape=(H, W, C))
    c_minput = Input(shape=(L, H, W))
    
    c_eout = encoder([c_iinput, c_minput])
    c_dout = decoder(c_eout)
    c_aout = adversary(c_eout)
    
    combined = Model(inputs=[c_iinput, c_minput], outputs=[c_eout, c_dout, c_aout])

    combined.compile(loss=[encoder_loss, decoder_loss, adversary_loss], loss_weights=[lamI, 1.0, lamG],
                     optimizer=adam, metrics=['accuracy', 'binary_accuracy'])
    
    return combined

In [None]:
print('Combined Model')
encoder = build_encoder()
decoder = build_decoder()
adversary = build_adversary()
combined = build_combined(encoder, decoder, adversary)
combined.summary()

print()
print('Metrics')
print(combined.metrics_names)

In [None]:
def DataGenerator(file_list, folder_path, messages, index, batch_size=12):
    indexes = np.arange(len(file_list))
    
    index = indexes[index*batch_size : (index+1)*batch_size]
    file_list_temp = [file_list[k] for k in index]
    message_temp = [messages[k] for k in index]

    images = []
    batch_msg = []
    dec_msg = []

    for i, ID in enumerate(file_list_temp):
        img = io.imread(folder_path + ID)
        img = img.reshape([H, W, C])
        images.append(np.array(img, dtype='float32')/255)
        
        msg = gen_msg_plates(message_temp[i])
        msg2 = gen_msg(message_temp[i])
        batch_msg.append(msg)
        dec_msg.append(msg2)

    return np.array(images), np.array(batch_msg), np.array(dec_msg)

def d_bit_accuracy(true, pred):
    correct = 0
    total = len(true)*L
    for i in range(len(true)):
        for j in range(len(true[i])):
            if int(np.round(true[i][j])) == (np.round(pred[i][j])):
                correct += 1
    acc = correct/total    
    return acc

In [None]:
# To remove the tf warning ;)
a = tf.to_int32(tf.Variable(0))

In [None]:
e_loss, d_loss, a_loss = [], [], []
e_acc, d_acc, a_acc = [], [], []
v_e_loss, v_d_loss, v_a_loss = [], [], []
v_e_acc, v_d_acc, v_a_acc = [], [], []
v_db_acc = []
train_loss, val_loss = [], []

tbatches = len(train_list)/batch_size
vbatches = len(validate_list)/batch_size

In [None]:
print('Training Complete Network')
for epoch in range(n_epochs):
    print('\nEpoch : ', epoch+1, '/', n_epochs, '\t\t\t', '[', end='')
    
    com_score = []
    v_score, v_dbit_acc = [], []

    # Shuffle Data after each Epoch
    train_list, message_train = shuffle(train_list, message_train)
    
    tstartt = time.time()
    for nbatch in range(int(tbatches)):
        if nbatch%12 == 0:
            print('#', end='')
                    
        # Generate Images
        r1, r2, r3 = DataGenerator(train_list, train_folder, message_train, nbatch)
        
        # Generate Encoder Images
        fake_images = encoder.predict([r1, r2])
        
        # Create two batches
        X = np.concatenate([r1, fake_images])
        y_dis = np.concatenate([np.zeros((np.int64(batch_size), 1)), np.ones((np.int64(batch_size), 1))])
        # Train discriminator
        adversary.trainable = True
        adversary.train_on_batch(X, y_dis)
        
        # Train Combined
        y_gen = np.ones((np.int64(batch_size), 1))
        adversary.trainable = False
        com_score.append(combined.train_on_batch([r1, r2], [r1, r3, y_gen]))

    tendt = time.time()
    print('|', end='')
    vstartt = time.time()

    # Shuffle Data after each Epoch
    validate_list, message_validate = shuffle(validate_list, message_validate)
    
    # Validate Model
    for nbatch in range(int(vbatches)):
        if nbatch%12 == 0:
            print('#', end='')
        v1, v2, v3 = DataGenerator(validate_list, validate_folder, message_validate, nbatch)
        
        y_gen = np.ones((np.int64(batch_size), 1))
        v_score.append(combined.evaluate([v1, v2], [v1, v3, y_gen], verbose=0))
#         res_v , d_out, _ = combined.predict([v1, v2])
#         v_dbit_acc.append(d_bit_accuracy(v3, d_out))

    vendt = time.time()
    print(']')

    # Calculating Metrics
    avgl_e, avgl_d, avgl_a = [], [], []
    avga_e, avga_d, avga_a = [], [], []
    avgl = []
    for com in com_score:
        avgl.append(0 if math.isnan(com[0]) else com[0])
        avgl.append(com[0])
        avgl_e.append(com[1])
        avgl_d.append(com[2])
        avgl_a.append(0 if math.isnan(com[3]) else com[3])
        avga_e.append(com[4])
        avga_d.append(com[7])
        avga_a.append(com[8])
    train_loss.append(np.mean(avgl))
    e_loss.append(np.mean(avgl_e))
    d_loss.append(np.mean(avgl_d))
    a_loss.append(np.mean(avgl_a))
    e_acc.append(np.mean(avga_e))
    d_acc.append(np.mean(avga_d))
    a_acc.append(np.mean(avga_a))
    
    avgl_e, avgl_d, avgl_a = [], [], []
    avga_e, avga_d, avga_a = [], [], []
    avgl = []
    for com in v_score:
        avgl.append(0 if math.isnan(com[0]) else com[0])
        avgl_e.append(com[1])
        avgl_d.append(com[2])
        avgl_a.append(0 if math.isnan(com[3]) else com[3])
        avga_e.append(com[4])
        avga_d.append(com[7])
        avga_a.append(com[8])
    val_loss.append(np.mean(avgl))
    v_e_loss.append(np.mean(avgl_e))
    v_d_loss.append(np.mean(avgl_d))
    v_a_loss.append(np.mean(avgl_a))
    v_e_acc.append(np.mean(avga_e))
    v_d_acc.append(np.mean(avga_d))
    v_a_acc.append(np.mean(avga_a))
    
#     v_db_acc.append(np.mean(v_dbit_acc))
    
    print()
    print('Training   - ', tendt - tstartt, 'sec')
    print('Train     | Loss  : {:10.7f}\t |'.format(train_loss[epoch]))
    print('Encoder   | Loss  : {:10.7f}'.format(e_loss[epoch]), end='\t')
    print(' | Accuracy : {:10.7f} |'.format(e_acc[epoch]))
    print('Decoder   | Loss  : {:10.7f}'.format(d_loss[epoch]), end='\t')
    print(' | Accuracy : {:10.7f} |'.format(d_acc[epoch]))
    print('Adversary | Loss  : {:10.7f}'.format(a_loss[epoch]), end='\t')
    print(' | Accuracy : {:10.7f} |'.format(a_acc[epoch]))
    print()
    print('Validation - ', vendt - vstartt, 'sec')
    print('Validate  | Loss  : {:10.7f}\t |'.format(val_loss[epoch]))
    print('Encoder   | Loss  : {:10.7f}'.format(v_e_loss[epoch]), end='\t')
    print(' | Accuracy : {:10.7f} |'.format(v_e_acc[epoch]))
    print('Decoder   | Loss  : {:10.7f}'.format(v_d_loss[epoch]), end='\t')
    print(' | Accuracy : {:10.7f} |'.format(v_d_acc[epoch]))
#     print('Bit Accuracy : {:10.7f} |'.format(v_db_acc[epoch]))
    print('Adversary | Loss  : {:10.7f}'.format(v_a_loss[epoch]), end='\t')
    print(' | Accuracy : {:10.7f} |'.format(v_a_acc[epoch]))
    
    # Save Complete Model
    combined.save(modelpath + 'complete-closs-' + str(L) + '-' + str(epoch) + '.h5')
    
    # Print Images
    in_img = []
    in_msg = []
    
    p1, p2, _ = DataGenerator(validate_list, validate_folder, message_validate, 0, 3)
    out_img = encoder.predict([p1, p2])
    
    fig, ax = plt.subplots(2, 3, sharex='col', sharey='row')#, figsize=(5, 5))
    plt.tight_layout()
    for j in range(3):
        ax[0, j].imshow((p1[j]*255).astype(np.uint8))
        ax[1, j].imshow((out_img[j]*255).astype(np.uint8))
        ax[0, j].autoscale(False)
        ax[1, j].autoscale(False)
    plt.show()

In [None]:
# Plot Encoder Accuracy
plt.plot(e_acc)
plt.plot(v_e_acc)
plt.title('Encoder Accuracy - Complete ' + str(L))
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Enc Acc', 'Val Enc Acc'], loc='upper left')
plt.savefig(modelpath + 'complete-closs-' + str(L) + '-enc-acc.png', dpi=300, pad_inches=0.1)
plt.show()

# Plot Decoder Accuracy
plt.plot(d_acc)
plt.plot(v_d_acc)
plt.plot(v_db_acc)
plt.title('Decoder Accuracy - Complete ' + str(L))
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Dec Acc', 'Val Dec Acc', 'Val Dec Bit Acc'], loc='upper left')
plt.savefig(modelpath + 'complete-closs-' + str(L) + '-dec-acc.png', dpi=300, pad_inches=0.1)
plt.show()

# Plot Adversary Accuracy
plt.plot(a_acc)
plt.plot(v_a_acc)
plt.title('Adversary Accuracy - Complete ' + str(L))
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Adv Acc', 'Val Adv Acc'], loc='upper left')
plt.savefig(modelpath + 'complete-closs-' + str(L) + '-adv-acc.png', dpi=300, pad_inches=0.1)
plt.show()

# Plot Encoder Loss
plt.plot(e_loss)
plt.plot(v_e_loss)
plt.title('Encoder Loss - Complete ' + str(L))
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Enc Loss', 'Val Enc Loss'], loc='upper left')
plt.savefig(modelpath + 'complete-closs-' + str(L) + '-enc-loss.png', dpi=300, pad_inches=0.1)
plt.show()

# Plot Decoder Loss
plt.plot(d_loss)
plt.plot(v_d_loss)
plt.title('Decoder Loss - Complete ' + str(L))
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Dec Loss', 'Val Dec Loss'], loc='upper left')
plt.savefig(modelpath + 'complete-closs-' + str(L) + '-dec-loss.png', dpi=300, pad_inches=0.1)
plt.show()

# Plot Adversary Loss
plt.plot(a_loss)
plt.plot(v_a_loss)
plt.title('Adversary Loss - Complete ' + str(L))
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Adv Loss', 'Val Adv Loss'], loc='upper left')
plt.savefig(modelpath + 'complete-closs-' + str(L) + '-adv-loss.png', dpi=300, pad_inches=0.1)
plt.show()

In [None]:
# Find the Best Accuracy of Models
print('Maximum Validation Accuracy - ')
e_max, d_max, a_max = 0, 0, 0
e_min, d_min, a_min = 0, 0, 0

# Encoder
maxa, maxlen = 0, len(v_e_acc)
maxacc = 0
minloss = 1
for i in range(maxlen):
    if (v_e_acc[i])>=maxacc:
        maxacc = v_e_acc[i]
        e_max = i
    if (v_e_loss[i]<=minloss):
        minloss = v_e_loss[i]
        e_min = i
print('Encoder   : {:10.7f} | Index : {}'.format(maxacc, e_max))
print('Encoder   : {:10.7f} | Index : {}'.format(minloss, e_min))

# Decoder
maxa, maxlen = 0, len(v_d_acc)
maxacc = 0
minloss = 1
for i in range(maxlen):
    if (v_d_acc[i])>=maxacc:
        maxacc = v_d_acc[i]
        d_max = i
    if (v_d_loss[i]<=minloss):
        minloss = v_d_loss[i]
        d_min = i
print('Decoder   : {:10.7f} | Index : {}'.format(maxacc, d_max))
print('Decoder   : {:10.7f} | Index : {}'.format(minloss, d_min))

# Adversary
maxa, maxlen = 0, len(v_a_acc)
maxacc = 0
minloss = 1
for i in range(maxlen):
    if (v_a_acc[i])>=maxacc:
        maxacc = v_a_acc[i]
        a_max = i
    if (v_a_loss[i]<=minloss):
        minloss = v_a_loss[i]
        a_min = i
print('Adversary : {:10.7f} | Index : {}'.format(maxacc, a_max))
print('Adversary : {:10.7f} | Index : {}'.format(minloss, a_min))

In [None]:
# Load Best Model
print('Loading Best Model', end='')
bmodel = 'complete-com-' + str(L) + '-' + str(e_min)
l_model = load_model(modelpath + bmodel + '.h5')#, custom_objects={'encoder_loss': encoder_loss})
l_model.load_weights(modelpath + bmodel + '.h5')
l_model.save(modelpath + bmodel + '.hdf5')
l_model = load_model(modelpath + bmodel + '.hdf5')#, custom_objects={'encoder_loss': encoder_loss})
# bmodel_d = 'complete-d-' + str(L) + '-' + str(d_min)
# l_model_d = load_model(modelpath + bmodel_d + '.h5')#, custom_objects={'decoder_loss': decoder_loss})
# l_model_d.load_weights(modelpath + bmodel_d + '.h5')
# l_model_d.save(modelpath + bmodel_d + '.hdf5')
# l_model_d = load_model(modelpath + bmodel_d + '.hdf5')#, custom_objects={'decoder_loss': decoder_loss})
# bmodel_a = 'complete-a-' + str(L) + '-' + str(a_min)
# l_model_a = load_model(modelpath + bmodel_a + '.h5')
# l_model_a.load_weights(modelpath + bmodel_a + '.h5')
# l_model_a.save(modelpath + bmodel_a + '.hdf5')
# l_model_a = load_model(modelpath + bmodel_a + '.hdf5')
print(' - Done')

ttbatches = len(test_list)/batch_size
fscore, lscore = [], []
b_dbit_acc, b_dbit_acc = [], []

print('Testing Models - ', end='')
for nbatch in range(int(ttbatches)):
    if nbatch%12 == 0:
        print('#', end='')
    t1, t2, t3 = DataGenerator(test_list, test_folder, message_test, nbatch)
    
    y_gen = np.ones((np.int64(batch_size), 1))
    
    fscore.append(combined.evaluate([t1, t2], [t1, t3, y_gen], verbose=0))
    lscore.append(l_model.evaluate([t1, t2], [t1, t3, y_gen], verbose=0))
    
    res_v , d_out, _ = combined.predict([t1, t2])
    b_dbit_acc.append(d_bit_accuracy(t3, d_out))
    res_v, d_out, _ = l_model.predict([t1, t2])
    l_dbit_acc.append(d_bit_accuracy(t3, d_out))
    
print(' - Done')

ef_score, df_score, af_score = [], [], []
lef_score, ldf_score, laf_score = [], [], []

for sample in fscore_e:
    ef_score.append(sample[4])
for sample in fscore_d:
    df_score.append(sample[5])
for sample in fscore_a:
    af_score.append(sample[6])

for sample in lscore_e:
    lef_score.append(sample[1])
for sample in lscore_d:
    ldf_score.append(sample[2])
for sample in lscore_a:
    laf_score.append(sample[3])

print()
print('Accuracy  |    Final   |     Best   |')
print('-'*9, '+', '-'*10, '+', '-'*10, '|')
print('Encoder   | {:10.8f} | {:10.8f} |'.format(np.mean(ef_score), np.mean(lef_score)))
print('Decoder   | {:10.8f} | {:10.8f} |'.format(np.mean(df_score), np.mean(ldf_score)))
print('Adversary | {:10.8f} | {:10.8f} |'.format(np.mean(af_score), np.mean(laf_score)))
print('Decoder b | {:10.8f} | {:10.8f} |'.format(np.mean(b_dbit_acc), np.mean(l_dbit_acc)))

In [None]:
# Save Images
def save_images(dfolder, rfolder, file_list, messages, bmodel):
    for index in range(len(test_list)//batch_size):
        print(index, end=' ')
        file_list_temp = file_list[index*batch_size : (index+1)*batch_size]
        message_temp = messages[index*batch_size : (index+1)*batch_size]

        images = []
        batch_msg = []
        for i, ID in enumerate(file_list_temp):
            img = io.imread(rfolder + ID)
            img = img.reshape([H, W, 3])
            msg = gen_msg_plates(message_temp[i])

            images.append(np.array(img)/255)
            batch_msg.append(msg)
        
        output, _, _ = bmodel.predict([np.array(images), np.array(batch_msg)])
        
        for i in range(len(output)):
            img = output[i]
            formatted = (img * 255 / np.max(img))
            formatted = formatted.astype('uint8')
            final = Image.fromarray(formatted)
            ImageFile.MAXBLOCK = final.size[0] * final.size[1] * 3
            final.save(dfolder + file_list_temp[i], "JPEG", quality=100, optimize=True, progressive=True)

In [None]:
# Save Test
# if np.mean(ef_score)>np.mean(lef_score):
#     bmodel = model_e
# else:
#     bmodel = l_model_e
bmodel = combined

print('Saving Encoded Test Images - ', end='')
save_images('dataset128/complete-' + str(L) + '-eda/test/', test_folder, test_list, message_test, bmodel)        
print(' - Done')
print('Saving Encoded Validate Images - ', end='')
save_images('dataset128/complete-' + str(L) + '-eda/validate/', validate_folder, validate_list, message_validate, bmodel)        
print(' - Done')
print('Saving Encoded Train Images - ', end='')
save_images('dataset128/complete-' + str(L) + '-eda/train/', train_folder, train_list, message_train, bmodel)        
print(' - Done')