In [None]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [None]:
import glob
import random
list_files = []
list_file_pattern = ["tiny-imagenet-200/train/*/*/*.JPEG"]
for file_pattern in list_file_pattern:
     sub_list_files = glob.glob(file_pattern)
     list_files = [*list_files, *sub_list_files]
#changed
random.seed(10)
random.shuffle(list_files)
#list_files = list_files[:5000]

N_TEST_SAMPLES = int(0.1 * len(list_files))
N_TRAIN_SAMPLES= len(list_files) - N_TEST_SAMPLES
list_files_train, list_files_val = list_files[:N_TRAIN_SAMPLES], list_files[N_TRAIN_SAMPLES:]

print("Total number of image:", len(list_files))
print("Total number of training:", N_TRAIN_SAMPLES)
print("Total number of testing:", N_TEST_SAMPLES)

Total number of image: 100000
Total number of training: 90000
Total number of testing: 10000


Check Out the Dataset


```
# This is formatted as code
```



dataset.py

In [None]:
import numpy as np
import cv2

import keras
 
class ImageDataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_files, dim, std, mean, batch_size=64, shuffle=True):
        'Initialization'
        
        self.list_files = list_files
        self.dim = dim
        self.std = std
        self.mean = mean
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_files) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        list_files_batch = [self.list_files[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_files_batch)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_files))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def normalize_batch(self, imgs):
        imgs = np.array(imgs, dtype=np.float32)
        imgs /= 255
        return (imgs -  self.mean) / self.std
                                                            
    def denormalize_batch(self, imgs, should_clip=True):
        imgs= imgs * self.std + self.mean 
        if should_clip:
            imgs = np.clip(imgs,0,1)
        imgs *= 255
        return imgs

    def __data_generation(self, list_files_batch):
        'Generates data containing batch_size samples' 
        X = []
        for i, fname in enumerate(list_files_batch):
            img = cv2.imread(fname)
            img = cv2.resize(img, (self.dim[1], self.dim[0]))
            # print(img.shape)
            X.append(img)
        X = self.normalize_batch(X)
        mid = int(len(X) / 2)
        secret = X[:mid]
        cover = X[mid:]
        return [secret, cover], [cover, secret]

model.py

In [None]:
import keras
import numpy as np
import os
import pickle

import sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import tensorflow as tf
from tensorflow.compat.v1.keras.backend import set_session #changed
from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D, concatenate, Input
from keras.layers.noise import GaussianNoise
from keras.callbacks import ModelCheckpoint
from keras.optimizers import Adadelta, RMSprop



def prepare_network(input_shape, input_tensor):
    # first block
    first_3x3_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu')
    ])(input_tensor)

    first_4x4_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu')
    ])(input_tensor)

    first_5x5_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu')
    ])(input_tensor)
    
    first_tensor = concatenate([first_3x3_conv, first_4x4_conv, first_5x5_conv], axis=3)

    # second blockGaussianNoise
    second_3x3_conv = Conv2D(filters = 50, kernel_size = (3, 3), 
                            padding = 'Same', activation = 'relu')(first_tensor)

    second_4x4_conv = Conv2D(filters = 50, kernel_size = (4, 4), 
                            padding = 'Same', activation = 'relu')(first_tensor)

    second_5x5_conv = Conv2D(filters = 50, kernel_size = (5, 5), 
                            padding = 'Same', activation = 'relu')(first_tensor)                                                

    second_tensor = concatenate([second_3x3_conv, second_4x4_conv, second_5x5_conv], axis=3, name='prepare_net_out')

    return second_tensor


def hidding_network(input_shape, input_tensor):
    # first block
    first_3x3_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu')
    ])(input_tensor)

    first_4x4_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu')
    ])(input_tensor)

    first_5x5_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu')
    ])(input_tensor)
    
    first_tensor = concatenate([first_3x3_conv, first_4x4_conv, first_5x5_conv], axis=3)

    # second block
    second_3x3_conv = Conv2D(filters = 50, kernel_size = (3, 3), 
                            padding = 'Same', activation = 'relu')(first_tensor)

    second_4x4_conv = Conv2D(filters = 50, kernel_size = (4, 4), 
                            padding = 'Same', activation = 'relu')(first_tensor)

    second_5x5_conv = Conv2D(filters = 50, kernel_size = (5, 5), 
                            padding = 'Same', activation = 'relu')(first_tensor)                                                

    second_tensor = concatenate([second_3x3_conv, second_4x4_conv, second_5x5_conv], axis=3)

    # convert to 3 channels image
    out = Conv2D(filters = 3, kernel_size = (1, 1), padding = 'Same', 
                activation = 'relu', name='hidding_net_out')(second_tensor) 
                
    # add noise
    out_noise = GaussianNoise(stddev=0.1, name='hidding_net_out_noise')(out)

    return out, out_noise

def reveal_network(input_shape, input_tensor):
    # first block
    first_3x3_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (3, 3),padding = 'Same', activation = 'relu')
    ])(input_tensor)

    first_4x4_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (4, 4),padding = 'Same', activation = 'relu')
    ])(input_tensor)

    first_5x5_conv = Sequential([
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu', input_shape = input_shape),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu'),
        Conv2D(filters = 50, kernel_size = (5, 5),padding = 'Same', activation = 'relu')
    ])(input_tensor)
    
    first_tensor = concatenate([first_3x3_conv, first_4x4_conv, first_5x5_conv], axis=3)

    # second block
    second_3x3_conv = Conv2D(filters = 50, kernel_size = (3, 3), 
                            padding = 'Same', activation = 'relu')(first_tensor)

    second_4x4_conv = Conv2D(filters = 50, kernel_size = (4, 4), 
                            padding = 'Same', activation = 'relu')(first_tensor)

    second_5x5_conv = Conv2D(filters = 50, kernel_size = (5, 5), 
                            padding = 'Same', activation = 'relu')(first_tensor)                                                

    second_tensor = concatenate([second_3x3_conv, second_4x4_conv, second_5x5_conv], axis=3)

    # convert to 3 channels image
    out = Conv2D(filters = 3, kernel_size = (1, 1), padding = 'Same', 
                activation = 'relu', name = 'reveal_net_out')(second_tensor) 
    
    return out

def net(img_shape):
    # prepare network
    secret_img = Input(shape=img_shape)
    prepare_net_input_shape = img_shape
    prepare_net_out = prepare_network(prepare_net_input_shape, secret_img)

    # hidding network
    cover_img = Input(shape=img_shape)
    hidding_net_input = concatenate([cover_img, prepare_net_out], axis=3)
    hidding_net_input_shape = (*img_shape[:2], 153)
    hidding_net_out, hidding_net_out_noise = hidding_network(hidding_net_input_shape, hidding_net_input)

    reveal_net_input_shape = img_shape
    reveal_net_out = reveal_network(reveal_net_input_shape, hidding_net_out)

    model = Model(inputs=[secret_img, cover_img], outputs=[hidding_net_out, reveal_net_out])
    return model

Train.py

In [None]:
import numpy as np
import os
import pickle
import glob
import random
import pandas as pd
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

import tensorflow as tf
import keras
from keras import backend as K
from tensorflow.compat.v1.keras.backend import set_session #changed
from keras.callbacks import ModelCheckpoint, TensorBoard, ReduceLROnPlateau
from keras.optimizers import Adadelta, RMSprop
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPool2D
from keras.utils import multi_gpu_model

# from model import *
# from dataset import ImageDataGenerator 

config = tf.compat.v1.ConfigProto() #changed
config.gpu_options.allow_growth = True
set_session(tf.compat.v1.Session(config=config))

list_files = []
list_file_pattern = ["tiny-imagenet-200/train/*/*/*.JPEG"]
for file_pattern in list_file_pattern:
    sub_list_files = glob.glob(file_pattern)
    list_files = [*list_files, *sub_list_files]

# def get_file():
#     data = pd.read_csv('shape.csv')
#     fname, x, y = data.iloc[:, 0], data.iloc[:, 1], data.iloc[:, 2]
#     list_files = []
#     for i in range(len(fname)):
#         rate = y[i] / x[i]
#         if 1.0 < rate and rate < 1.4:
#             list_files.append(fname[i])
#     return list_files
# list_files = get_file()
random.seed(10)
random.shuffle(list_files)
# list_files =list_files[:5000]

N_TEST_SAMPLES = int(0.1 * len(list_files))
N_TRAIN_SAMPLES= len(list_files) - N_TEST_SAMPLES
list_files_train, list_files_val = list_files[:N_TRAIN_SAMPLES], list_files[N_TRAIN_SAMPLES:]

print("Total number of image:", len(list_files))
print("Total number of training:", N_TRAIN_SAMPLES)
print("Total number of testing:", N_TEST_SAMPLES)

N_EPOCHS = 100
BATCH_SIZE = 64
BETA = 0.8
params = {
    'dim': (64, 64, 3),
    'std': np.array([0.229, 0.224, 0.225]),
    'mean': np.array([0.485, 0.456, 0.406]),
    'batch_size': BATCH_SIZE
}

# Generators
training_generator = ImageDataGenerator(list_files_train, **params, shuffle=True)
validation_generator = ImageDataGenerator(list_files_val, **params, shuffle=True)

# config to run with multi-GPU

'''
with tf.device("/cpu:0"):
    model = net(params['dim'])
model = multi_gpu_model(model, gpus=2)
'''
model = net(params['dim']) #changed


model.load_weights("/content/drive/My Drive/Model/weights-28-2.00.hdf5")
model.summary()

def SSIMLoss(y_true, y_pred):
  return 1 - tf.reduce_mean(tf.image.ssim(y_true, y_pred, 1.0))
#https://stackoverflow.com/questions/57357146/use-ssim-loss-function-with-keras

import math
def PSNR(y_true, y_pred):
    max_pixel = 1.0
    return 10.0 * (1.0 / math.log(10)) * K.log((max_pixel ** 2) / (K.mean(K.square(y_pred -y_true))))

#https://stackoverflow.com/questions/55844618/how-to-calculate-psnr-metric-in-keras
  
model.compile(
    loss={'hidding_net_out': 'mse', 'reveal_net_out': 'mse'},
    optimizer=RMSprop(lr=0.001, rho=0.9, epsilon=1e-08, decay=0.0),
    loss_weights={'hidding_net_out': 1, 'reveal_net_out': BETA},
    metrics=None
)


model_file = "/content/drive/My Drive/Model/weights-{epoch:02d}-{val_loss:.2f}.hdf5"
checkpoint = ModelCheckpoint(model_file, monitor='val_loss', verbose=1)
# tbCallBack = TensorBoard(log_dir='./tensorboard/CNN', write_graph=True, write_images=True)
#learning_rate_reduction = ReduceLROnPlateau(monitor='val_loss', patience=2, verbose=1, factor=0.5, min_lr=0.000001)
callbacks_list = [checkpoint]

model.fit_generator(
        training_generator,
        steps_per_epoch=min(N_TRAIN_SAMPLES // BATCH_SIZE, 500),
        initial_epoch=28,
        epochs=N_EPOCHS,
        validation_data=validation_generator,
        validation_steps=min(N_TEST_SAMPLES // BATCH_SIZE, 50),
        callbacks=callbacks_list,
        verbose=1)


Total number of image: 100000
Total number of training: 90000
Total number of testing: 10000
Model: "functional_9"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_9 (InputLayer)            [(None, 64, 64, 3)]  0                                            
__________________________________________________________________________________________________
sequential_36 (Sequential)      (None, 64, 64, 50)   69050       input_9[0][0]                    
__________________________________________________________________________________________________
sequential_37 (Sequential)      (None, 64, 64, 50)   122600      input_9[0][0]                    
__________________________________________________________________________________________________
sequential_38 (Sequential)      (None, 64, 64, 50)   191450      input_9[0][0]               

<tensorflow.python.keras.callbacks.History at 0x7fd6bd16c2e8>