# Ye-Net CNN Input TPU

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Libraries

In [None]:
from scipy import misc, ndimage, signal
from sklearn.model_selection  import train_test_split
import numpy
import numpy as np
import random
import ntpath
import os
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.colors as colors
from keras import optimizers 
from keras import regularizers
import tensorflow as tf
import cv2
from keras import backend as K
from time import time
import time as tm
import datetime
from operator import itemgetter
import glob
from skimage.util.shape import view_as_blocks
from keras.utils import np_utils
from keras.utils import to_categorical

## 30 SRM filters

In [None]:
################################################## 30 SRM FILTERS
srm_weights = np.load('SRM_Kernels.npy')
biasSRM=numpy.ones(30)
print (srm_weights.shape)
################################################## TLU ACTIVATION FUNCTION
T3 = 3;
def Tanh3(x):
    tanh3 = K.tanh(x)*T3
    return tanh3
##################################################

## TPU

In [None]:
#https://www.tensorflow.org/guide/tpu
#https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/guide/tpu.ipynb
#https://colab.research.google.com/notebooks/tpu.ipynb#scrollTo=_pQCOmISAQBu
%tensorflow_version 2.x
import tensorflow as tf
print("Tensorflow version " + tf.__version__)

try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()  # TPU detection
  print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
except ValueError:
  raise BaseException('ERROR: Not connected to a TPU runtime; please see the previous cell in this notebook for instructions!')

tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
tpu_strategy = tf.distribute.TPUStrategy(tpu)

## Ye-Net architecture

In [None]:
def Ye_Net(img_size=256):
    #tf.keras.backend.clear_session()
    
    #Inputs
    inputs = tf.keras.Input(shape=(img_size,img_size,1), name="input_1")
    print(inputs.shape)
    
    #Block 1
    layers = tf.keras.layers.Conv2D(30, (5,5), weights=[srm_weights,biasSRM], strides=(1,1), trainable=False, activation=Tanh3, use_bias=True)(inputs)
    
    print(layers.shape)
    
    #Block 2
    
    layers = tf.keras.layers.Conv2D(30, (3,3), strides=(1,1), kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers) 
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    layers = tf.keras.layers.Lambda(tf.keras.backend.abs)(layers)
    layers = tf.keras.layers.BatchNormalization(momentum=0.2, epsilon=0.001, center=True, scale=False, trainable=True, fused=None, renorm=False, renorm_clipping=None, renorm_momentum=0.4, adjustment=None)(layers)
    layers = tf.keras.layers.Concatenate()([layers, layers, layers])
    print(layers.shape)
    
    #Block 3
    layers = tf.keras.layers.SpatialDropout2D(rate=0.1)(layers)
    layers = tf.keras.layers.Conv2D(30, (3,3), strides=(1,1), kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers) 
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    layers = tf.keras.layers.Lambda(tf.keras.backend.abs)(layers)
    layers = tf.keras.layers.BatchNormalization(momentum=0.2, epsilon=0.001, center=True, scale=False, trainable=True, fused=None, renorm=False, renorm_clipping=None, renorm_momentum=0.4, adjustment=None)(layers)
    print(layers.shape)
    
    #Block 4
    layers = tf.keras.layers.SpatialDropout2D(rate=0.1)(layers)
    layers = tf.keras.layers.Conv2D(30, (3,3), strides=(1,1), kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers) 
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    layers = tf.keras.layers.Lambda(tf.keras.backend.abs)(layers)
    layers = tf.keras.layers.BatchNormalization(momentum=0.2, epsilon=0.001, center=True, scale=False, trainable=True, fused=None, renorm=False, renorm_clipping=None, renorm_momentum=0.4, adjustment=None)(layers)
    layers = tf.keras.layers.AveragePooling2D((2,2), strides= (2,2))(layers)
    print(layers.shape)
    
    #Block 5
    layers = tf.keras.layers.SpatialDropout2D(rate=0.1)(layers)
    layers = tf.keras.layers.Conv2D(32, (5,5), strides=(1,1), kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    layers = tf.keras.layers.Lambda(tf.keras.backend.abs)(layers)
    layers = tf.keras.layers.BatchNormalization(momentum=0.2, epsilon=0.001, center=True, scale=False, trainable=True, fused=None, renorm=False, renorm_clipping=None, renorm_momentum=0.4, adjustment=None)(layers)
    print(layers.shape)
    
    #Block 6
    layers = tf.keras.layers.Concatenate()([layers, layers, layers])
    layers = tf.keras.layers.GlobalAveragePooling2D(data_format="channels_last")(layers)
    layers = tf.keras.layers.Dense(128,kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    layers = tf.keras.layers.Dense(64,kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    layers = tf.keras.layers.Dense(32,kernel_initializer='glorot_normal',kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
    layers = ReLU(negative_slope=0.1, threshold=0)(layers)
    predictions = tf.keras.layers.Dense(2,kernel_initializer='glorot_normal', activation="softmax", name="output_1",kernel_regularizer=tf.keras.regularizers.l2(0.0001),bias_regularizer=tf.keras.regularizers.l2(0.0001))(layers)
    print(predictions.shape)
    
    #Model generation
    model = tf.keras.Model(inputs = inputs, outputs=predictions)
    #Optimizer
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.005, momentum=0.95)#lrate
    #Compilator
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])

    print ("Ye-net model 2 generated")
    return model

## Defining different functions to work with the architecture

In [None]:

def Final_Results_Valid(PATH_trained_models):
    global AccValid
    global LossValid
    AccValid = []
    LossValid = [] 
    B_accuracy = 0 #B --> Best
    for filename in sorted(os.listdir(PATH_trained_models)):
        if filename != ('train') and filename != ('validation'):
            print(filename)
            with tpu_strategy.scope(): # creating the model in the TPUStrategy scope means we will train the model on the TPU
                 _model = Ye_Net()
            _model.load_weights(PATH_trained_models+'/'+filename)
            loss,accuracy = _model.evaluate(X_valid, y_valid, verbose=0)
            print(f'Loss={loss:.4f} y Accuracy={accuracy:0.4f}'+'\n')

            BandAccValid  = accuracy
            BandLossValid = loss
            AccValid.append(BandAccValid)    
            LossValid.append(BandLossValid)  
            
            if accuracy > B_accuracy:
                B_accuracy = accuracy
                B_loss = loss
                B_name = filename
    
    print("\n\nBest")
    print(B_name)
    print(f'Loss={B_loss:.4f} y Accuracy={B_accuracy:0.4f}'+'\n')

In [None]:
def Final_Results_Train(PATH_trained_models):
    global AccTrain
    global LossTrain
    AccTrain = []
    LossTrain = [] 
    B_accuracy = 0 #B --> Best
    for filename in sorted(os.listdir(PATH_trained_models)):
        if filename != ('train') and filename != ('validation'):
            print(filename)
            with tpu_strategy.scope(): # creating the model in the TPUStrategy scope means we will train the model on the TPU
                 _model = Ye_Net()
            _model.load_weights(PATH_trained_models+'/'+filename)
            loss,accuracy = _model.evaluate(X_train, y_train, verbose=0)
            print(f'Loss={loss:.4f} y Accuracy={accuracy:0.4f}'+'\n')

            BandAccTrain  = accuracy
            BandLossTrain = loss
            AccTrain.append(BandAccTrain)    
            LossTrain.append(BandLossTrain)  
            
            if accuracy > B_accuracy:
                B_accuracy = accuracy
                B_loss = loss
                B_name = filename
    
    print("\n\nBest")
    print(B_name)
    print(f'Loss={B_loss:.4f} y Accuracy={B_accuracy:0.4f}'+'\n')

In [None]:
def Final_Results_Test(PATH_trained_models):
    global AccTest
    global LossTest
    AccTest = []
    LossTest= [] 
    B_accuracy = 0 #B --> Best
    for filename in sorted(os.listdir(PATH_trained_models)):
        if filename != ('train') and filename != ('validation'):
            print(filename)
            with tpu_strategy.scope(): # creating the model in the TPUStrategy scope means we will train the model on the TPU
                 _model = Ye_Net()
            _model.load_weights(PATH_trained_models+'/'+filename)
            loss,accuracy = _model.evaluate(X_test, y_test, verbose=0)
            print(f'Loss={loss:.4f} y Accuracy={accuracy:0.4f}'+'\n')

            BandAccTest  = accuracy
            BandLossTest = loss
            AccTest.append(BandAccTest)    
            LossTest.append(BandLossTest)  
            
            if accuracy > B_accuracy:
                B_accuracy = accuracy
                B_loss = loss
                B_name = filename
    
    print("\n\nBest")
    print(B_name)
    print(f'Loss={B_loss:.4f} y Accuracy={B_accuracy:0.4f}'+'\n')

In [None]:
def graphics(AccTest, AccTrain, AccValid, LossTest, LossTrain, LossValid, model_name, path_img_base):
    if not os.path.exists(path_img_base+"/"+model_name):
       os.makedirs(path_img_base+"/"+model_name)

    with tpu_strategy.scope(): # creating the model in the TPUStrategy scope means we will train the model on the TPU
        model = Ye_Net()
    
    lossTEST,accuracyTEST   = model.evaluate(X_test, y_test,verbose=None)
    lossTRAIN,accuracyTRAIN = model.evaluate(X_train, y_train,verbose=None)
    lossVALID,accuracyVALID = model.evaluate(X_valid, y_valid,verbose=None)

    with plt.style.context('seaborn-white'):
        plt.figure(figsize=(10, 10))
        plt.plot(np.concatenate([np.array([accuracyTRAIN]),np.array(AccTrain)],axis=0))
        plt.plot(np.concatenate([np.array([accuracyVALID]),np.array(AccValid)],axis=0))
        plt.plot(np.concatenate([np.array([accuracyTEST]),np.array(AccTest)],axis=0)) #Test
        plt.title('Accuracy Vs Epoch')
        plt.ylabel('Accuracy')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation', 'Test'], loc='upper left')
        plt.grid('on')
        plt.savefig(path_img_base+'/'+model_name+'/Accuracy_Ye_Net_'+model_name+'.eps', format='eps')
        plt.savefig(path_img_base+'/'+model_name+'/Accuracy_Ye_Net_'+model_name+'.svg', format='svg')
        plt.savefig(path_img_base+'/'+model_name+'/Accuracy_Ye_Net_'+model_name+'.pdf', format='pdf')     
        plt.show()
        
        plt.figure(figsize=(10, 10))
        plt.plot(np.concatenate([np.array([lossTRAIN]),np.array(LossTrain)],axis=0))
        plt.plot(np.concatenate([np.array([lossVALID]),np.array(LossValid)],axis=0))
        plt.plot(np.concatenate([np.array([lossTEST]),np.array(LossTest)],axis=0)) #Test
        plt.title('Loss Vs Epoch')
        plt.ylabel('Loss')
        plt.xlabel('Epoch')
        plt.legend(['Train', 'Validation', 'Test'], loc='upper left')
        plt.grid('on')
        plt.savefig(path_img_base+'/'+model_name+'/Loss_Ye_Net_'+model_name+'.eps', format='eps')
        plt.savefig(path_img_base+'/'+model_name+'/Loss_Ye_Net_'+model_name+'.svg', format='svg')
        plt.savefig(path_img_base+'/'+model_name+'/Loss_Ye_Net_'+model_name+'.pdf', format='pdf') 
        plt.show()

In [None]:
def top_models(AccTest,AccTrain,AccValid):
    numbers=AccTest
    numbers_sort = sorted(enumerate(numbers), key=itemgetter(1),  reverse=True)
    for i in range(int(len(numbers)*(0.05))): #5% total epochs
        index, value = numbers_sort[i]
        print("Test Accuracy {}, epoch:{}\n".format(value, index+1))
    
    print("")
    
    numbers=AccTrain
    numbers_sort = sorted(enumerate(numbers), key=itemgetter(1),  reverse=True)
    for i in range(int(len(numbers)*(0.05))): #5% total epochs
        index, value = numbers_sort[i]
        print("Train Accuracy {}, epoch:{}\n".format(value, index+1))
    
    print("")
    
    numbers=AccValid
    numbers_sort = sorted(enumerate(numbers), key=itemgetter(1),  reverse=True)
    for i in range(int(len(numbers)*(0.05))): #5% total epochs
        index, value = numbers_sort[i]
        print("Validation Accuracy {}, epoch:{}\n".format(value, index+1))

In [None]:
def trainTPU(path_model, epochs, model_Name):
    global model_name
    start_time = tm.time()
    model_name = model_Name
    path_log_base = path_model+'/'+model_Name
    if not os.path.exists(path_log_base):
        os.makedirs(path_log_base)

    with tpu_strategy.scope(): # creating the model in the TPUStrategy scope means we will train the model on the TPU
         model = Ye_Net()

    epoch_ = 1
    for epoch in range(epochs):
        epoch=epoch+1
        print("epoch ",epoch)
        model.fit(X_train,y_train,validation_data=(X_valid,y_valid), batch_size=128*2, epochs=epoch_, verbose=1) 
        model.save_weights(path_model+'/'+model_name+'/'+str(epoch).zfill(4)+'.hdf5', overwrite=True) 

    TIME = tm.time() - start_time
    print("Time "+model_name+" = %s [seconds]" % TIME)

## Working with BOSSbase 1.01 WOW y PAYLOAD = 0.4bpp

In the README, there is a link to download the databases we use for the work. There is BOSSbase 1.01, cover images and stego. The steganographic algorithms used in the paper are WOW and S-UNIWARD, with a payload of 0.4bpp.

Note: If you want to train the algorithm with S-UNIWARD 0.4 bpp, change "PATH04_WOW1" and  "base_name".

# Different - CNN Input

To speed up the learning process and avoid issueswith GPU memory limitation, CNN optimization is performed overbatches of images rather than the complete training set. This dataset division means that the class distribution within each batch of images affects the learning process. To demonstrate the effects of stego-cover image quantity for a batch in the learning processand find the best way of feeding the images to the network, three different approaches were tested; 




-Usual(providing all the cover images, then all the stego images) 


-Random(random positions of cover and stego images)


-order (alter-nates cover and stego images)

Note: Any of the nine "CNN Input" can be used to train your model, you can also download the databases in PGM format through a link found in the README.

## [1] Train image distribution "Random", and Validation image distribution "Usual"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    X=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                X.append( [ patches[i,j] ] )
    X=numpy.array(X)
    return X


#Train Images
Xc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Yc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Ys = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')

X = (numpy.vstack((Xc, Xs)))
Y = (numpy.vstack((Yc, Ys)))


Xt = (numpy.hstack(([0]*len(Xc), [1]*len(Xs))))
Yt = (numpy.hstack(([0]*len(Yc), [1]*len(Ys))))


Xt = np_utils.to_categorical(Xt, 2)
Yt = np_utils.to_categorical(Yt, 2)


####random train
idx=np.arange(len(X))
random.shuffle(idx)
X=X[idx]
Xt=Xt[idx]

X=np.rollaxis(X,1,4)  #channel axis shifted to last axis
Y=np.rollaxis(Y,1,4)  #channel axis shifted to last axis


X_train=X
y_train=Xt
X_valid=Y
y_valid=Yt

print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


## [2] Train image distribution "Random", and Validation image distribution "Random"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    X=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                X.append( [ patches[i,j] ] )
    X=numpy.array(X)
    return X


#Train Images
Xc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Yc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Ys = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')


X = (numpy.vstack((Xc, Xs)))
Y = (numpy.vstack((Yc, Ys)))


Xt = (numpy.hstack(([0]*len(Xc), [1]*len(Xs))))
Yt = (numpy.hstack(([0]*len(Yc), [1]*len(Ys))))


Xt = np_utils.to_categorical(Xt, 2)
Yt = np_utils.to_categorical(Yt, 2)


####random train
idx1=np.arange(len(X))
random.shuffle(idx1)
X=X[idx1]
Xt=Xt[idx1]

####random valid
idx2=np.arange(len(Y))
random.shuffle(idx2)
Y=Y[idx2]
Yt=Yt[idx2]



X=np.rollaxis(X,1,4)  #channel axis shifted to last axis
Y=np.rollaxis(Y,1,4)  #channel axis shifted to last axis


X_train=X
y_train=Xt
X_valid=Y
y_valid=Yt

print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


## [3] Train image distribution "Order", and Validation image distribution "Order"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    data=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                data.append( [ patches[i,j] ] )
    data=numpy.array(data)
    return data

def CS_X(X):
    X2 = X.copy()
    j=0
    k=int(len(X)/2)
    for i in range(int(len(X)-1)):
        if i%2 == 0: #par 
            X2[i,:,:,0] = X[j,:,:,0]
            j=j+1
        if i%2 == 1: #impar 
            X2[i,:,:,0] = X[k,:,:,0]
            k=k+1
    return X2

def CS_y(y):
    y2 = y.copy()
    j=0
    k=int(len(y)/2)
    for i in range(int(len(y)-1)):
        if i%2 == 0: #par 
            y2[i,:] = y[j,:]
            j=j+1
        if i%2 == 1: #impar 
            y2[i,:] = y[k,:]
            k=k+1
    return y2

##################################################################################################################

#Train Images
Xc0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Xc1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Xs1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')

##################################################################################################################

y_train = CS_y(np_utils.to_categorical((numpy.hstack(([0]*len(Xc0), [1]*len(Xs0)))), 2))
y_valid = CS_y(np_utils.to_categorical((numpy.hstack(([0]*len(Xc1), [1]*len(Xs1)))), 2))


X_train = CS_X(np.rollaxis((numpy.vstack((Xc0, Xs0))),1,4))
X_valid = CS_X(np.rollaxis((numpy.vstack((Xc1, Xs1))),1,4))


print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


 ## [4] Train image distribution "Usual", and Validation image distribution "Usual"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    X=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                X.append( [ patches[i,j] ] )
    X=numpy.array(X)
    return X


#Train Images
Xc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Yc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Ys = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')

X = (numpy.vstack((Xc, Xs)))
Y = (numpy.vstack((Yc, Ys)))


Xt = (numpy.hstack(([0]*len(Xc), [1]*len(Xs))))
Yt = (numpy.hstack(([0]*len(Yc), [1]*len(Ys))))


Xt = np_utils.to_categorical(Xt, 2)
Yt = np_utils.to_categorical(Yt, 2)


X=np.rollaxis(X,1,4)  #channel axis shifted to last axis
Y=np.rollaxis(Y,1,4)  #channel axis shifted to last axis


X_train=X
y_train=Xt
X_valid=Y
y_valid=Yt

print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


 ## [5] Train image distribution "Order", and Validation image distribution "Usual"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    data=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                data.append( [ patches[i,j] ] )
    data=numpy.array(data)
    return data

def CS_X(X):
    X2 = X.copy()
    j=0
    k=int(len(X)/2)
    for i in range(int(len(X)-1)):
        if i%2 == 0: #par 
            X2[i,:,:,0] = X[j,:,:,0]
            j=j+1
        if i%2 == 1: #impar 
            X2[i,:,:,0] = X[k,:,:,0]
            k=k+1
    return X2

def CS_y(y):
    y2 = y.copy()
    j=0
    k=int(len(y)/2)
    for i in range(int(len(y)-1)):
        if i%2 == 0: #par 
            y2[i,:] = y[j,:]
            j=j+1
        if i%2 == 1: #impar 
            y2[i,:] = y[k,:]
            k=k+1
    return y2

##################################################################################################################

#Train Images
Xc0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Xc1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Xs1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')

##################################################################################################################

y_train = CS_y(np_utils.to_categorical((numpy.hstack(([0]*len(Xc0), [1]*len(Xs0)))), 2))
y_valid = np_utils.to_categorical((numpy.hstack(([0]*len(Xc1), [1]*len(Xs1)))), 2)

X_train = CS_X(np.rollaxis((numpy.vstack((Xc0, Xs0))),1,4))
X_valid = np.rollaxis((numpy.vstack((Xc1, Xs1))),1,4)


print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


 ## [6] Train image distribution "Order", and Validation image distribution "Random"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    data=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                data.append( [ patches[i,j] ] )
    data=numpy.array(data)
    return data

def CS_X(X):
    X2 = X.copy()
    j=0
    k=int(len(X)/2)
    for i in range(int(len(X)-1)):
        if i%2 == 0: #par 
            X2[i,:,:,0] = X[j,:,:,0]
            j=j+1
        if i%2 == 1: #impar 
            X2[i,:,:,0] = X[k,:,:,0]
            k=k+1
    return X2

def CS_y(y):
    y2 = y.copy()
    j=0
    k=int(len(y)/2)
    for i in range(int(len(y)-1)):
        if i%2 == 0: #par 
            y2[i,:] = y[j,:]
            j=j+1
        if i%2 == 1: #impar 
            y2[i,:] = y[k,:]
            k=k+1
    return y2

##################################################################################################################

#Train Images
Xc0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Yc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Ys = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')

##################################################################################################################

y_train = CS_y(np_utils.to_categorical((numpy.hstack(([0]*len(Xc0), [1]*len(Xs0)))), 2))
X_train = CS_X(np.rollaxis((numpy.vstack((Xc0, Xs0))),1,4))

Y = (numpy.vstack((Yc, Ys)))
Z = (numpy.vstack((Zc, Zs)))

Yt = (numpy.hstack(([0]*len(Yc), [1]*len(Ys))))


Yt = np_utils.to_categorical(Yt, 2)

####Random valid
idx2=np.arange(len(Y))
random.shuffle(idx2)
Y=Y[idx2]
Yt=Yt[idx2]

Y=np.rollaxis(Y,1,4)  #channel axis shifted to last axis

X_valid=Y
y_valid=Yt


print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


 ## [7] Train image distribution "Random", and Validation image distribution "Order"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    data=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                data.append( [ patches[i,j] ] )
    data=numpy.array(data)
    return data

def CS_X(X):
    X2 = X.copy()
    j=0
    k=int(len(X)/2)
    for i in range(int(len(X)-1)):
        if i%2 == 0: #par 
            X2[i,:,:,0] = X[j,:,:,0]
            j=j+1
        if i%2 == 1: #impar 
            X2[i,:,:,0] = X[k,:,:,0]
            k=k+1
    return X2

def CS_y(y):
    y2 = y.copy()
    j=0
    k=int(len(y)/2)
    for i in range(int(len(y)-1)):
        if i%2 == 0: #par 
            y2[i,:] = y[j,:]
            j=j+1
        if i%2 == 1: #impar 
            y2[i,:] = y[k,:]
            k=k+1
    return y2

##################################################################################################################

#Train Images
Xc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Xc1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Xs1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')


##################################################################################################################

y_valid = np_utils.to_categorical((numpy.hstack(([0]*len(Xc1), [1]*len(Xs1)))), 2)
X_valid = np.rollaxis((numpy.vstack((Xc1, Xs1))),1,4)


X = (numpy.vstack((Xc, Xs)))
Xt = (numpy.hstack(([0]*len(Xc), [1]*len(Xs))))
Xt = np_utils.to_categorical(Xt, 2)
####Random train
idx1=np.arange(len(X))
random.shuffle(idx1)
X=X[idx1]
Xt=Xt[idx1]
X=np.rollaxis(X,1,4)  #channel axis shifted to last axis
X_train=X
y_train=Xt

print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)

 ## [8] Train image distribution "Usual", and Validation image distribution "Random"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    X=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                X.append( [ patches[i,j] ] )
    X=numpy.array(X)
    return X

#Train Images
Xc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Yc = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Ys = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')


X = (numpy.vstack((Xc, Xs)))
Y = (numpy.vstack((Yc, Ys)))


Xt = (numpy.hstack(([0]*len(Xc), [1]*len(Xs))))
Yt = (numpy.hstack(([0]*len(Yc), [1]*len(Ys))))


Xt = np_utils.to_categorical(Xt, 2)
Yt = np_utils.to_categorical(Yt, 2)


####random valid
idx2=np.arange(len(Y))
random.shuffle(idx2)
Y=Y[idx2]
Yt=Yt[idx2]


X=np.rollaxis(X,1,4)  #channel axis shifted to last axis
Y=np.rollaxis(Y,1,4)  #channel axis shifted to last axis


X_train=X
y_train=Xt
X_valid=Y
y_valid=Yt

print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)


 ## [9] Train image distribution "Usual", and Validation image distribution "Order"

In [None]:
n=256
def load_images(path_pattern):
    files=glob.glob(path_pattern)
    data=[]
    for f in files:
        I = cv2.imread(f, cv2.IMREAD_GRAYSCALE)
        patches = view_as_blocks(I, (n, n))
        for i in range(patches.shape[0]):
            for j in range(patches.shape[1]):
                data.append( [ patches[i,j] ] )
    data=numpy.array(data)
    return data

def CS_X(X):
    X2 = X.copy()
    j=0
    k=int(len(X)/2)
    for i in range(int(len(X)-1)):
        if i%2 == 0: #par 
            X2[i,:,:,0] = X[j,:,:,0]
            j=j+1
        if i%2 == 1: #impar 
            X2[i,:,:,0] = X[k,:,:,0]
            k=k+1
    return X2

def CS_y(y):
    y2 = y.copy()
    j=0
    k=int(len(y)/2)
    for i in range(int(len(y)-1)):
        if i%2 == 0: #par 
            y2[i,:] = y[j,:]
            j=j+1
        if i%2 == 1: #impar 
            y2[i,:] = y[k,:]
            k=k+1
    return y2

##################################################################################################################

#Train Images
Xc0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/cover/*.pgm')
Xs0 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/train/stego/*.pgm')
#Validation Images
Xc1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/cover/*.pgm')
Xs1 = load_images('/DATABASES/BOSSbase-1.01/WOW/0.4bpp/valid/stego/*.pgm')

##################################################################################################################

y_train = np_utils.to_categorical((numpy.hstack(([0]*len(Xc0), [1]*len(Xs0)))), 2)
y_valid = CS_y(np_utils.to_categorical((numpy.hstack(([0]*len(Xc1), [1]*len(Xs1)))), 2))

X_train = np.rollaxis((numpy.vstack((Xc0, Xs0))),1,4)
X_valid = CS_y(np.rollaxis((numpy.vstack((Xc1, Xs1))),1,4))


print(X_train.shape)
print(y_train.shape)
print(X_valid.shape)
print(y_valid.shape)

## Training

In [None]:
path_model = "./WOW/logs"
path_img_base = "./Image/WOW/images"

model_Name = "Ye_Net..."

trainTPU(path_model=path_model, epochs=150, model_Name = "Ye_Net...")

## Test

In [None]:
Final_Results_Test(path_model+"/"+model_Name) 

## Valid

In [None]:
Final_Results_Valid(path_model+"/"+model_Name) 

## Train

In [None]:
Final_Results_Train(path_model+"/"+model_Name) 

## graphics

In [None]:
graphics(AccTest, AccTrain, AccValid, LossTest, LossTrain, LossValid, model_Name, path_img_base)

Note: If you want to train the algorithm with S-UNIWARD 0.4 bpp, change "PATH04_WOW1" and  "base_name".